From 77a8d99cab8bf5fa99b948ee38ac75172f6d734e Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 6 May 2024 22:53:07 +0800 Subject: [PATCH] add endpoint --- apps/endpoint/.gitignore | 19 ++ apps/endpoint/LICENSE | 191 +++++++++++++++ apps/endpoint/README.md | 9 + apps/endpoint/include/endpoint.hrl | 76 ++++++ apps/endpoint/rebar.config | 7 + apps/endpoint/src/endpoint.app.src | 15 ++ apps/endpoint/src/endpoint.erl | 363 ++++++++++++++++++++++++++++ apps/endpoint/src/endpoint_app.erl | 20 ++ apps/endpoint/src/endpoint_sup.erl | 35 +++ apps/endpoint/src/gen_endpoint.erl | 374 +++++++++++++++++++++++++++++ apps/iot/src/iot.app.src | 1 + 11 files changed, 1110 insertions(+) create mode 100644 apps/endpoint/.gitignore create mode 100644 apps/endpoint/LICENSE create mode 100644 apps/endpoint/README.md create mode 100644 apps/endpoint/include/endpoint.hrl create mode 100644 apps/endpoint/rebar.config create mode 100644 apps/endpoint/src/endpoint.app.src create mode 100644 apps/endpoint/src/endpoint.erl create mode 100644 apps/endpoint/src/endpoint_app.erl create mode 100644 apps/endpoint/src/endpoint_sup.erl create mode 100644 apps/endpoint/src/gen_endpoint.erl diff --git a/apps/endpoint/.gitignore b/apps/endpoint/.gitignore new file mode 100644 index 0000000..f1c4554 --- /dev/null +++ b/apps/endpoint/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/apps/endpoint/LICENSE b/apps/endpoint/LICENSE new file mode 100644 index 0000000..8ed2215 --- /dev/null +++ b/apps/endpoint/LICENSE @@ -0,0 +1,191 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2024, anlicheng <244108715@qq.com>. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/apps/endpoint/README.md b/apps/endpoint/README.md new file mode 100644 index 0000000..327372e --- /dev/null +++ b/apps/endpoint/README.md @@ -0,0 +1,9 @@ +endpoint +===== + +An OTP application + +Build +----- + + $ rebar3 compile diff --git a/apps/endpoint/include/endpoint.hrl b/apps/endpoint/include/endpoint.hrl new file mode 100644 index 0000000..b02b241 --- /dev/null +++ b/apps/endpoint/include/endpoint.hrl @@ -0,0 +1,76 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 06. 5月 2024 18:17 +%%%------------------------------------------------------------------- +-author("anlicheng"). + +-record(http_endpoint, { + url = <<>> :: binary(), + pool_size = 10 :: integer() +}). + +-record(ws_endpoint, { + url = <<>> :: binary() +}). + +-record(mqtt_endpoint, { + host = <<>> :: binary(), + port = 0 :: integer(), + username = <<>> :: binary(), + password = <<>> :: binary(), + topic = <<>> :: binary(), + qos = 0 :: integer() +}). + +-record(kafka_endpoint, { + username = <<>> :: binary(), + password = <<>> :: binary(), + bootstrap_servers = [] :: [binary()], + topic = <<>> :: binary() +}). + +-record(mysql_endpoint, { + host = <<>> :: binary(), + port = 0 :: integer(), + username = <<>> :: binary(), + password = <<>> :: binary(), + database = <<>> :: binary(), + table_name = <<>> :: binary(), + pool_size = 10 :: integer() +}). + +-record(endpoint, { + id :: integer(), + %% 不同的对端名字要唯一 + name = <<>> :: binary(), + %% 标题描述 + title = <<>> :: binary(), + mapper = <<>> :: binary(), + %% 数据转换规则,基于 + %% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}]) + %% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}], Timestamp :: integer()) + mapper_fun = fun(_, Fields) -> Fields end :: fun(), + %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} + config = #http_endpoint{} :: #http_endpoint{} | #ws_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{}, + %% 更新时间 + updated_at = 0 :: integer(), + %% 创建时间 + created_at = 0 :: integer() +}). + +-record(north_data, { + id :: integer(), + location_code, + fields, + timestamp :: integer() +}). + +-record(post_data, { + id, + location_code, + body +}). \ No newline at end of file diff --git a/apps/endpoint/rebar.config b/apps/endpoint/rebar.config new file mode 100644 index 0000000..3f80bb9 --- /dev/null +++ b/apps/endpoint/rebar.config @@ -0,0 +1,7 @@ +{erl_opts, [debug_info]}. +{deps, []}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [endpoint]} +]}. diff --git a/apps/endpoint/src/endpoint.app.src b/apps/endpoint/src/endpoint.app.src new file mode 100644 index 0000000..30e33cc --- /dev/null +++ b/apps/endpoint/src/endpoint.app.src @@ -0,0 +1,15 @@ +{application, endpoint, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {endpoint_app, []}}, + {applications, + [kernel, + stdlib + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache-2.0"]}, + {links, []} + ]}. diff --git a/apps/endpoint/src/endpoint.erl b/apps/endpoint/src/endpoint.erl new file mode 100644 index 0000000..adf3c51 --- /dev/null +++ b/apps/endpoint/src/endpoint.erl @@ -0,0 +1,363 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(endpoint). +-include("endpoint.hrl"). + +-behaviour(gen_statem). + +%% API +-export([start_link/2]). +-export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, clean_up/1, get_mapper_fun/1]). + +%% gen_statem callbacks +-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-record(state, { + endpoint, + mp, + postman_pid :: undefined | pid(), + %% 队列 + queue, + %% 定时器 + timer_map = #{}, + %% 窗口大小,允许最大的未确认消息数 + window_size = 10, + %% 未确认的消息数 + flight_num = 0, + %% 记录成功处理的消息数 + acc_num = 0 +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_name(Name :: binary() | #endpoint{}) -> atom(). +get_name(#endpoint{name = Name}) when is_binary(Name) -> + get_name(Name); +get_name(EndpointName) when is_binary(EndpointName) -> + binary_to_atom(<<"iot_endpoint:", EndpointName/binary>>). + +-spec get_pid(Name :: binary()) -> undefined | pid(). +get_pid(Name) when is_binary(Name) -> + whereis(get_name(Name)). + +-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(undefined, _, _, _) -> + ok; +forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}). + +reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> + gen_statem:cast(Pid, {reload, NEndpoint}). + +-spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}. +get_stat(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_stat, 5000). + +-spec clean_up(Pid :: pid()) -> ok. +clean_up(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, clean_up, 5000). + +-spec get_mapper_fun(Pid :: pid()) -> fun(). +get_mapper_fun(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_mapper_fun). + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(Name, Endpoint = #endpoint{}) -> + gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([Endpoint = #endpoint{matcher = Regexp}]) -> + erlang:process_flag(trap_exit, true), + + %% 编译正则表达式 + {ok, MP} = re:compile(Regexp), + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_postman), + + {ok, disconnected, #state{endpoint = Endpoint, mp = MP, queue = queue:new(), postman_pid = undefined}}. + +%% @private +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. + +%% @private +%% @doc There should be one instance of this function for each possible +%% state name. If callback_mode is state_functions, one of these +%% functions is called when gen_statem receives and event from +%% call/2, cast/2, or as a normal process message. + +%% 重新加载新的终端配置 +handle_event(cast, {reload, NEndpoint}, disconnected, State = #state{endpoint = Endpoint}) -> + lager:warning("[iot_endpoint] state_name: disconnected, reload endpoint, old: ~p, new: ~p", [Endpoint, NEndpoint]), + {keep_state, State#state{endpoint = NEndpoint}}; + +handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, State = #state{endpoint = Endpoint, timer_map = TimerMap, postman_pid = PostmanPid}) -> + lager:debug("[iot_endpoint] state_name: connected, reload endpoint, old: ~p, new: ~p", [Endpoint, NEndpoint]), + case config_equals(NEndpoint#endpoint.config, Endpoint#endpoint.config) of + true -> + lager:debug("[iot_endpoint] reload endpoint: ~p, config equals", [Name]), + {keep_state, State#state{endpoint = NEndpoint}}; + false -> + %% 解除和postman的link关系 + unlink(PostmanPid), + %% 关闭postman进程 + catch PostmanPid ! stop, + %% 未确认的消息需要暂存 + lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), + %% 重新建立新的postman + erlang:start_timer(0, self(), create_postman), + + {next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}} + end; + +handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{mp = MP, queue = Q, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name}}) -> + case re:run(LocationCode, MP, [{capture, all, list}]) of + nomatch -> + {keep_state, State}; + {match, _} -> + lager:debug("[iot_endpoint] name: ~p, match location_code: ~p", [Name, LocationCode]), + Q1 = queue:in(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}, Q), + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso FlightNum < WindowSize of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State#state{queue = Q1}, Actions} + end; + +%% 触发读取下一条数据 +handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint{name = Name}}) -> + lager:debug("[iot_endpoint] fetch_next endpoint: ~p, postman offline, data in queue", [Name]), + {keep_state, State}; +handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize -> + {keep_state, State}; +handle_event(info, fetch_next, connected, State = #state{queue = Q, endpoint = #endpoint{name = Name}, timer_map = TimerMap, flight_num = FlightNum}) -> + case queue:out(Q) of + {{value, NorthData = #north_data{id = Id}}, Q1} -> + lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), + case do_post(NorthData, State) of + error -> + {keep_state, State}; + {ok, TimerRef} -> + {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}} + end; + {empty, Q1} -> + {keep_state, State#state{queue = Q1}} + end; + +%% 收到确认消息 +handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name}, timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) -> + ok = mnesia_queue:delete(TabName, Id), + lager:debug("[iot_endpoint] endpoint: ~p, get ack: ~p, delete from mnesia", [Name, Id]), + Actions = case StateName =:= connected of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions}; + +%% 收到重发过期请求 +handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap}) -> + lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]), + case do_post(NorthData, State) of + error -> + {keep_state, State}; + {ok, TimerRef} -> + {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}} + end; + +%% 离线时,忽略超时逻辑 +handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> + {keep_state, State}; + +handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{endpoint = Endpoint = #endpoint{name = Name, config = Config}, window_size = WindowSize}) -> + lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]), + try + {ok, PostmanPid} = create_postman(Endpoint), + %% 最多允许window_size + Actions = lists:map(fun(_) -> {next_event, info, fetch_next} end, lists:seq(1, WindowSize)), + {next_state, connected, State#state{endpoint = Endpoint, postman_pid = PostmanPid, timer_map = maps:new(), flight_num = 0}, Actions} + catch _:Error -> + lager:warning("[iot_endpoint] endpoint: ~p, config: ~p, create postman get error: ~p", [Name, Config, Error]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + + {keep_state, State#state{endpoint = Endpoint, postman_pid = undefined}} + end; + +%% 删除时需要清理 +handle_event({call, From}, clean_up, _, State = #state{tab_name = TabName}) -> + mnesia:delete_table(TabName), + {keep_state, State, [{reply, From, ok}]}; + +handle_event({call, From}, get_mapper_fun, _, State = #state{endpoint = #endpoint{mapper_fun = F}}) -> + {keep_state, State, [{reply, From, F}]}; + +%% 获取当前统计信息 +handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, tab_name = TabName}) -> + Stat = #{ + <<"acc_num">> => AccNum, + <<"queue_num">> => mnesia_queue:table_size(TabName), + <<"state_name">> => atom_to_binary(StateName) + }, + {keep_state, State, [{reply, From, Stat}]}; + +%% postman进程挂掉时,重新建立新的 +handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap, postman_pid = PostmanPid}) -> + lager:warning("[iot_endpoint] endpoint: ~p, postman exited with reason: ~p", [Name, Reason]), + lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + + {next_state, disconnected, State#state{timer_map = maps:new(), postman_pid = undefined}}; + +%% @private +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. +handle_event(EventType, Event, StateName, State) -> + lager:warning("[iot_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), + {keep_state, State}. + +%% @private +%% @doc This function is called by a gen_statem when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_statem terminates with +%% Reason. The return value is ignored. +terminate(Reason, _StateName, #state{endpoint = #endpoint{name = Name}}) -> + lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Name, Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec remove_timer(Id :: integer(), TimerMap :: #{}) -> NTimerMap :: #{}. +remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) -> + case maps:take(Id, TimerMap) of + error -> + TimerMap; + {TimerRef, NTimerMap} -> + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + NTimerMap + end. + +%% 对http和https协议的支持 +create_postman(#endpoint{config = #http_endpoint{url = Url, pool_size = PoolSize}}) -> + WorkerArgs = [{url, Url}], + broker_postman:start_link(http_postman, WorkerArgs, PoolSize); + +%% 对mqtt协议的支持, 只需要建立单个链接 +create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) -> + Node = atom_to_binary(node()), + ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>, + Opts = [ + {clientid, ClientId}, + {host, binary_to_list(Host)}, + {port, Port}, + {tcp_opts, []}, + {username, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keepalive, 86400}, + {auto_ack, true}, + {connect_timeout, 5000}, + {proto_ver, v5}, + {retry_interval, 5000} + ], + + mqtt_postman:start_link(Opts, Topic, Qos); + +%% 对mysql协议的支持 +create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}) -> + WorkerArgs = [ + {mysql_opts, [ + {host, binary_to_list(Host)}, + {port, Port}, + {user, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keep_alive, true}, + {database, binary_to_list(Database)}, + {queries, [<<"set names utf8">>]} + ]}, + {table, TableName} + ], + broker_postman:start_link(mysql_postman, WorkerArgs, PoolSize); +create_postman(#endpoint{}) -> + throw(<<"not supported">>). + +-spec do_post(NorthData :: #north_data{}, State :: #state{}) -> error | {ok, TimerRef :: reference()}. +do_post(NorthData = #north_data{id = Id}, #state{postman_pid = PostmanPid, tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> + lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), + case safe_invoke_mapper(MapperFun, NorthData) of + {ok, Body} -> + PostmanPid ! {post, self(), make_post_data(NorthData, Body)}, + %% 重发机制, 在发送的过程中mapper可能会改变 + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), + {ok, TimerRef}; + {error, Error} -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]), + mnesia_queue:delete(TabName, Id), + error; + ignore -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]), + mnesia_queue:delete(TabName, Id), + error + end. + +-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) -> + {ok, Body :: any()} | ignore | {error, Reason :: any()}. +safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) -> + try + if + is_function(MapperFun, 2) -> + MapperFun(LocationCode, Fields); + is_function(MapperFun, 3) -> + MapperFun(LocationCode, Fields, Timestamp) + end + catch _:Error -> + {error, Error} + end. + +-spec make_post_data(NorthData :: #north_data{}, Body :: any()) -> PostData :: #post_data{}. +make_post_data(#north_data{id = Id, location_code = LocationCode}, Body) -> + #post_data{id = Id, location_code = LocationCode, body = Body}. + + +-spec config_equals(any(), any()) -> boolean(). +config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) -> + true; +config_equals(#ws_endpoint{url = Url}, #ws_endpoint{url = Url}) -> + true; +config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}, + #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> + true; +config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}, + #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> + true; +config_equals(_, _) -> + false. \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_app.erl b/apps/endpoint/src/endpoint_app.erl new file mode 100644 index 0000000..c972e48 --- /dev/null +++ b/apps/endpoint/src/endpoint_app.erl @@ -0,0 +1,20 @@ +%%%------------------------------------------------------------------- +%% @doc endpoint public API +%% @end +%%%------------------------------------------------------------------- + +-module(endpoint_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + lager:debug("[endpoint] started"), + + endpoint_sup:start_link(). + +stop(_State) -> + ok. + +%% internal functions diff --git a/apps/endpoint/src/endpoint_sup.erl b/apps/endpoint/src/endpoint_sup.erl new file mode 100644 index 0000000..8d8861b --- /dev/null +++ b/apps/endpoint/src/endpoint_sup.erl @@ -0,0 +1,35 @@ +%%%------------------------------------------------------------------- +%% @doc endpoint top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(endpoint_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{strategy => one_for_all, + intensity => 0, + period => 1}, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/apps/endpoint/src/gen_endpoint.erl b/apps/endpoint/src/gen_endpoint.erl new file mode 100644 index 0000000..9951bf1 --- /dev/null +++ b/apps/endpoint/src/gen_endpoint.erl @@ -0,0 +1,374 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(gen_endpoint). + +-include("endpoint.hrl"). +-behaviour(gen_statem). + +%% API +-export([start_link/3]). +-export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, clean_up/1, get_mapper_fun/1]). + +%% gen_statem callbacks +-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-record(state, { + endpoint, + mod :: atom(), + mod_state :: any(), + %% ets存储 + last_id = 1 :: integer(), + cursor :: integer(), + tid :: reference(), + %% 定时器 + timer_map = #{}, + %% 窗口大小,允许最大的未确认消息数 + window_size = 10, + %% 未确认的消息数 + flight_num = 0, + %% 记录成功处理的消息数 + acc_num = 0 +}). + +-callback init(Args :: term()) -> + {ok, State :: term()} | {ok, State :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term()} | ignore. + +-callback handle_data(Request :: term(), From :: from(), + State :: term()) -> + {reply, Reply :: term(), NewState :: term()} | + {reply, Reply :: term(), NewState :: term(), timeout() | hibernate | {continue, term()}} | + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), Reply :: term(), NewState :: term()} | + {stop, Reason :: term(), NewState :: term()}. + +-callback handle_cast(Request :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), NewState :: term()}. +-callback handle_info(Info :: timeout | term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), NewState :: term()}. +-callback handle_continue(Info :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), NewState :: term()}. +-callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | +term()), + State :: term()) -> + term(). +-callback code_change(OldVsn :: (term() | {down, term()}), State :: term(), + Extra :: term()) -> + {ok, NewState :: term()} | {error, Reason :: term()}. + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_name(Name :: binary() | #endpoint{}) -> atom(). +get_name(#endpoint{name = Name}) when is_binary(Name) -> + get_name(Name); +get_name(EndpointName) when is_binary(EndpointName) -> + binary_to_atom(<<"endpoint:", EndpointName/binary>>). + +-spec get_pid(Name :: binary()) -> undefined | pid(). +get_pid(Name) when is_binary(Name) -> + whereis(get_name(Name)). + +-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(undefined, _, _, _) -> + ok; +forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}). + +reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> + gen_statem:cast(Pid, {reload, NEndpoint}). + +-spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}. +get_stat(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_stat, 5000). + +-spec clean_up(Pid :: pid()) -> ok. +clean_up(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, clean_up, 5000). + +-spec get_mapper_fun(Pid :: pid()) -> fun(). +get_mapper_fun(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_mapper_fun). + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(Name, Mod, Endpoint = #endpoint{}) -> + gen_statem:start_link({local, Name}, ?MODULE, [Mod, Endpoint], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([Mod, Endpoint = #endpoint{id = Id}]) -> + erlang:process_flag(trap_exit, true), + + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_postman), + + %% 初始化存储 + EtsName = list_to_atom("endpoint_ets:" ++ integer_to_list(Id)), + Tid = ets:new(EtsName, [ordered_set, private]), + + case Mod:init(Endpoint) of + {ok, StateName, MState} -> + {ok, StateName, #state{endpoint = Endpoint, mod = Mod, tid = Tid, mod_state = MState}}; + {stop, Reason} -> + {stop, Reason} + end. + +%% @private +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. + +%% @private +%% @doc There should be one instance of this function for each possible +%% state name. If callback_mode is state_functions, one of these +%% functions is called when gen_statem receives and event from +%% call/2, cast/2, or as a normal process message. + +%% TODO 不支持, 重新加载新的终端配置, 对应的办法是关闭后重启 + +handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tid = Tid, last_id = LastId, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name}}) -> + lager:debug("[iot_endpoint] name: ~p, match location_code: ~p", [Name, LocationCode]), + NorthData = #north_data{id = LastId, location_code = LocationCode, fields = Fields, timestamp = Timestamp}, + true = ets:insert(Tid, NorthData), + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso FlightNum < WindowSize of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State#state{last_id = LastId + 1}, Actions}; + +%% 触发读取下一条数据 +handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize -> + {keep_state, State}; +handle_event(info, fetch_next, connected, State = #state{tid = Tid, cursor = Cursor, endpoint = #endpoint{name = Name}, timer_map = TimerMap, flight_num = FlightNum}) -> + case ets:next(Tid, Cursor) of + '$end_of_table' -> + {keep_state, State}; + NKey -> + [NorthData = #north_data{id = Id}] = ets:lookup(Tid, NKey), + lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), + case do_post(NorthData, State) of + error -> + {keep_state, State}; + {ok, TimerRef} -> + {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}} + end + end; + +%% 收到确认消息 +handle_event(info, {ack, Id}, StateName, State = #state{tid = Tid, endpoint = #endpoint{name = Name}, timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) -> + true = ets:delete(Tid, Id), + + lager:debug("[iot_endpoint] endpoint: ~p, get ack: ~p, delete from mnesia", [Name, Id]), + Actions = case StateName =:= connected of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions}; + +%% 收到重发过期请求 +handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, + State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap}) -> + + lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]), + case do_post(NorthData, State) of + error -> + {keep_state, State}; + {ok, TimerRef} -> + {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}} + end; + +%% todo 离线时,忽略超时逻辑 +handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> + {keep_state, State}; + +handle_event(info, {timeout, _, create_postman}, disconnected, + State = #state{endpoint = Endpoint = #endpoint{name = Name}, window_size = WindowSize, mod = Mod, mod_state = ModState}) -> + lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]), + case Mod:create_postman(Endpoint, ModState) of + {ok, NModState} -> + %% 最多允许window_size + Actions = lists:map(fun(_) -> {next_event, info, fetch_next} end, lists:seq(1, WindowSize)), + {next_state, connected, State#state{endpoint = Endpoint, timer_map = maps:new(), flight_num = 0, mod_state = NModState}, Actions}; + error -> + {keep_state, State} + end; + +handle_event({call, From}, get_mapper_fun, _, State = #state{endpoint = #endpoint{mapper_fun = F}}) -> + {keep_state, State, [{reply, From, F}]}; + +%% 获取当前统计信息 +handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, tid = Tid}) -> + Stat = #{ + <<"acc_num">> => AccNum, + <<"queue_num">> => ets:info(Tid, size), + <<"state_name">> => atom_to_binary(StateName) + }, + {keep_state, State, [{reply, From, Stat}]}; + +%% postman进程挂掉时,重新建立新的 +handle_event(info, Info, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap, mod = Mod, mod_state = ModState}) -> + case Mod:handle_info(Info, ModState) of + {} -> + ok + end, + + lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + + {next_state, disconnected, State#state{timer_map = maps:new()}}; + +%% @private +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. +handle_event(EventType, Event, StateName, State) -> + lager:warning("[iot_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), + {keep_state, State}. + +%% @private +%% @doc This function is called by a gen_statem when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_statem terminates with +%% Reason. The return value is ignored. +terminate(Reason, _StateName, #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap}) -> + lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Name, Reason]), + lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), + ok. + +%% @private +%% @doc Convert process state when code is changed +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec remove_timer(Id :: integer(), TimerMap :: #{}) -> NTimerMap :: #{}. +remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) -> + case maps:take(Id, TimerMap) of + error -> + TimerMap; + {TimerRef, NTimerMap} -> + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + NTimerMap + end. + +%% 对http和https协议的支持 +create_postman(#endpoint{config = #http_endpoint{url = Url, pool_size = PoolSize}}) -> + WorkerArgs = [{url, Url}], + broker_postman:start_link(http_postman, WorkerArgs, PoolSize); + +%% 对mqtt协议的支持, 只需要建立单个链接 +create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) -> + Node = atom_to_binary(node()), + ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>, + Opts = [ + {clientid, ClientId}, + {host, binary_to_list(Host)}, + {port, Port}, + {tcp_opts, []}, + {username, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keepalive, 86400}, + {auto_ack, true}, + {connect_timeout, 5000}, + {proto_ver, v5}, + {retry_interval, 5000} + ], + + mqtt_postman:start_link(Opts, Topic, Qos); + +%% 对mysql协议的支持 +create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}) -> + WorkerArgs = [ + {mysql_opts, [ + {host, binary_to_list(Host)}, + {port, Port}, + {user, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keep_alive, true}, + {database, binary_to_list(Database)}, + {queries, [<<"set names utf8">>]} + ]}, + {table, TableName} + ], + broker_postman:start_link(mysql_postman, WorkerArgs, PoolSize); +create_postman(#endpoint{}) -> + throw(<<"not supported">>). + +-spec do_post(NorthData :: #north_data{}, State :: #state{}) -> error | {ok, TimerRef :: reference()}. +do_post(NorthData = #north_data{id = Id, location_code = LocationCode}, #state{tid = Tid, mod = Mod, mod_state = ModState, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> + lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), + case safe_invoke_mapper(MapperFun, NorthData) of + {ok, Body} -> + Mod:handle_data({Id, LocationCode, Body}, ModState), + + %% 重发机制, 在发送的过程中mapper可能会改变 + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), + {ok, TimerRef}; + {error, Error} -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]), + ets:delete(Tid, Id), + error; + ignore -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]), + ets:delete(Tid, Id), + error + end. + +-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) -> + {ok, Body :: any()} | ignore | {error, Reason :: any()}. +safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) -> + try + if + is_function(MapperFun, 2) -> + MapperFun(LocationCode, Fields); + is_function(MapperFun, 3) -> + MapperFun(LocationCode, Fields, Timestamp) + end + catch _:Error -> + {error, Error} + end. + +-spec config_equals(any(), any()) -> boolean(). +config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) -> + true; +config_equals(#ws_endpoint{url = Url}, #ws_endpoint{url = Url}) -> + true; +config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}, + #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> + true; +config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}, + #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> + true; +config_equals(_, _) -> + false. \ No newline at end of file diff --git a/apps/iot/src/iot.app.src b/apps/iot/src/iot.app.src index 68e206f..4ace423 100644 --- a/apps/iot/src/iot.app.src +++ b/apps/iot/src/iot.app.src @@ -16,6 +16,7 @@ poolboy, mysql, esockd, + endpoint, mnesia, crypto, public_key,