From 5fd69f03471eb88a80c700a40c7f68c50af006ab Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 4 Jun 2025 10:35:36 +0800 Subject: [PATCH] add code --- apps/endpoint/src/endpoint.app.src | 4 +- apps/endpoint/src/endpoint.erl | 63 +++++++++ apps/endpoint/src/endpoint_buffer.erl | 136 ++++++++++++++++++ apps/endpoint/src/endpoint_http.erl | 127 +++++++++++++++++ apps/endpoint/src/endpoint_mqtt.erl | 194 ++++++++++++++++++++++++++ apps/endpoint/src/endpoint_mysql.erl | 186 ++++++++++++++++++++++++ apps/endpoint/src/endpoint_sup.erl | 31 +++- apps/endpoint/src/endpoint_timer.erl | 128 +++++++++++++++++ 8 files changed, 865 insertions(+), 4 deletions(-) create mode 100644 apps/endpoint/src/endpoint.erl create mode 100644 apps/endpoint/src/endpoint_buffer.erl create mode 100644 apps/endpoint/src/endpoint_http.erl create mode 100644 apps/endpoint/src/endpoint_mqtt.erl create mode 100644 apps/endpoint/src/endpoint_mysql.erl create mode 100644 apps/endpoint/src/endpoint_timer.erl diff --git a/apps/endpoint/src/endpoint.app.src b/apps/endpoint/src/endpoint.app.src index 30e33cc..b10c10f 100644 --- a/apps/endpoint/src/endpoint.app.src +++ b/apps/endpoint/src/endpoint.app.src @@ -4,7 +4,9 @@ {registered, []}, {mod, {endpoint_app, []}}, {applications, - [kernel, + [ + emqtt, + kernel, stdlib ]}, {env,[]}, diff --git a/apps/endpoint/src/endpoint.erl b/apps/endpoint/src/endpoint.erl new file mode 100644 index 0000000..b6697c5 --- /dev/null +++ b/apps/endpoint/src/endpoint.erl @@ -0,0 +1,63 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(endpoint). +-include("endpoint.hrl"). + +%% API +-export([start_link/1]). +-export([get_name/1, get_pid/1, forward/4, reload/2, clean_up/1]). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}. +start_link(Endpoint = #endpoint{id = Id, config = #http_endpoint{}}) -> + Name = get_name(Id), + endpoint_http:start_link(Name, Endpoint); +start_link(Endpoint = #endpoint{id = Id, config = #mqtt_endpoint{}}) -> + Name = get_name(Id), + endpoint_mqtt:start_link(Name, Endpoint); +start_link(Endpoint = #endpoint{id = Id, config = #mysql_endpoint{}}) -> + Name = get_name(Id), + endpoint_mysql:start_link(Name, Endpoint). + +-spec get_name(Id :: integer()) -> atom(). +get_name(Id) when is_integer(Id) -> + list_to_atom("endpoint:" ++ integer_to_list(Id)). + +-spec get_pid(Id :: integer()) -> undefined | pid(). +get_pid(Id) when is_integer(Id) -> + whereis(get_name(Id)). + +-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(undefined, _, _, _) -> + ok; +forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_server:cast(Pid, {forward, LocationCode, Fields, Timestamp}). + +reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> + gen_statem:cast(Pid, {reload, NEndpoint}). + +-spec clean_up(Pid :: pid()) -> ok. +clean_up(Pid) when is_pid(Pid) -> + gen_server:call(Pid, clean_up, 5000). + +-spec config_equals(any(), any()) -> boolean(). +config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) -> + true; +config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}, + #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> + true; +config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}, + #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> + true; +config_equals(_, _) -> + false. \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_buffer.erl b/apps/endpoint/src/endpoint_buffer.erl new file mode 100644 index 0000000..3ccc24b --- /dev/null +++ b/apps/endpoint/src/endpoint_buffer.erl @@ -0,0 +1,136 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(endpoint_buffer). + +-include("endpoint.hrl"). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-export([new/2, append/2, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]). +-export_type([buffer/0]). + +-record(buffer, { + endpoint :: #endpoint{}, + next_id = 1 :: integer(), + %% 当前数据所在的游标 + cursor = 0 :: integer(), + %% ets存储的引用 + tid :: ets:tid(), + %% 定时器 + timer_pid :: pid(), + %% 窗口大小,允许最大的未确认消息数 + window_size = 10, + %% 未确认的消息数 + flight_num = 0, + %% 记录成功处理的消息数 + acc_num = 0 +}). + +-record(north_data, { + id :: integer(), + location_code, + fields, + timestamp :: integer() +}). + +-type buffer() :: #buffer{}. + +-spec new(Endpoint :: #endpoint{}, WindowSize :: integer()) -> Buffer :: #buffer{}. +new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 -> + %% 初始化存储 + EtsName = list_to_atom("endpoint_buffer_ets:" ++ integer_to_list(Id)), + Tid = ets:new(EtsName, [ordered_set, private]), + %% 定义重发器 + {ok, TimerPid} = endpoint_timer:start_link(?RETRY_INTERVAL), + + #buffer{cursor = 0, tid = Tid, timer_pid = TimerPid, endpoint = Endpoint, window_size = WindowSize}. + +-spec append(tuple(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}. +append({LocationCode, Fields, Timestamp}, Buffer = #buffer{tid = Tid, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) -> + NorthData = #north_data{id = NextId, location_code = LocationCode, fields = Fields, timestamp = Timestamp}, + true = ets:insert(Tid, NorthData), + NBuffer = Buffer#buffer{next_id = NextId + 1}, + case FlightNum < WindowSize of + true -> + trigger_next(NBuffer); + false -> + NBuffer + end. + +-spec trigger_n(Buffer :: #buffer{}) -> NBuffer :: #buffer{}. +trigger_n(Buffer = #buffer{window_size = WindowSize}) -> + %% 最多允许window_size + lists:foldl(fun(_, Buffer0) -> trigger_next(Buffer0) end, Buffer, lists:seq(1, WindowSize)). + +%% 触发读取下一条数据 +-spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}. +trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{title = Title, mapper_fun = MapperFun}}) -> + case ets:next(Tid, Cursor) of + '$end_of_table' -> + Buffer; + NKey -> + [NorthData = #north_data{id = Id, location_code = LocationCode}] = ets:lookup(Tid, NKey), + %lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), + %% 重发机制, 在发送的过程中mapper可能会改变 + case safe_invoke_mapper(MapperFun, NorthData) of + {ok, Body} -> + ReceiverPid = self(), + ReceiverPid ! {next_data, Id, LocationCode, Body}, + endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! {next_data, Id, LocationCode, Body} end), + + Buffer#buffer{flight_num = FlightNum + 1}; + {error, Error} -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Title, Error]), + ets:delete(Tid, Id), + Buffer; + ignore -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Title]), + ets:delete(Tid, Id), + Buffer + end + end. + +-spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}. +ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) -> + true = ets:delete(Tid, Id), + endpoint_timer:ack(TimerPid, Id), + trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}). + +%% 获取当前统计信息 +-spec stat(Buffer :: #buffer{}) -> map(). +stat(#buffer{acc_num = AccNum, tid = Tid}) -> + #{ + <<"acc_num">> => AccNum, + <<"queue_num">> => ets:info(Tid, size) + }. + +-spec cleanup(Buffer :: #buffer{}) -> ok. +cleanup(#buffer{timer_pid = TimerPid}) -> + endpoint_timer:cleanup(TimerPid), + ok. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) -> + {ok, Body :: any()} | ignore | {error, Reason :: any()}. +safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) -> + try + if + is_function(MapperFun, 2) -> + MapperFun(LocationCode, Fields); + is_function(MapperFun, 3) -> + MapperFun(LocationCode, Fields, Timestamp) + end + catch _:Error -> + {error, Error} + end. \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_http.erl b/apps/endpoint/src/endpoint_http.erl new file mode 100644 index 0000000..8da36da --- /dev/null +++ b/apps/endpoint/src/endpoint_http.erl @@ -0,0 +1,127 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 07. 5月 2024 11:17 +%%%------------------------------------------------------------------- +-module(endpoint_http). +-author("anlicheng"). +-include("endpoint.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + endpoint :: #endpoint{}, + buffer :: endpoint_buffer:buffer() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Name :: atom(), Endpoint :: #endpoint{}) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(Name, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(Name) -> + gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([Endpoint]) -> + Buffer = endpoint_buffer:new(Endpoint, 10), + {ok, #state{endpoint = Endpoint, buffer = Buffer}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> + Stat = endpoint_buffer:stat(Buffer), + {reply, {ok, Stat}, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer), + {noreply, State#state{buffer = NBuffer}}; + +handle_cast(cleanup, State = #state{buffer = Buffer}) -> + endpoint_buffer:cleanup(Buffer), + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({next_data, Id, _LocationCode, Body}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> + Headers = [ + {<<"content-type">>, <<"application/json">>} + ], + case hackney:request(post, Url, Headers, Body) of + {ok, 200, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]), + NBuffer = endpoint_buffer:ack(Id, Buffer), + + {noreply, State#state{buffer = NBuffer}}; + {ok, HttpCode, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), + {noreply, State}; + {error, Reason} -> + lager:warning("[http_postman] url: ~p, get error: ~p", [Url, Reason]), + {noreply, State} + end. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/endpoint/src/endpoint_mqtt.erl b/apps/endpoint/src/endpoint_mqtt.erl new file mode 100644 index 0000000..cb50c17 --- /dev/null +++ b/apps/endpoint/src/endpoint_mqtt.erl @@ -0,0 +1,194 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(endpoint_mqtt). + +-include("endpoint.hrl"). +-behaviour(gen_server). + +%% API +-export([start_link/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-define(DISCONNECTED, disconnected). +-define(CONNECTED, connected). + +-record(state, { + endpoint :: #endpoint{}, + buffer :: endpoint_buffer:buffer(), + conn_pid :: undefined | pid(), + %% 待确认的数据, #{PacketId :: integer() => Id :: integer()} + inflight = #{}, + + status = disconnected +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) -> + gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([Endpoint]) -> + erlang:process_flag(trap_exit, true), + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_postman), + %% 初始化存储 + Buffer = endpoint_buffer:new(Endpoint, 10), + + {ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> + Stat = endpoint_buffer:stat(Buffer), + {reply, {ok, Stat}, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer), + {noreply, State#state{buffer = NBuffer}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED, + endpoint = #endpoint{title = Title, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) -> + lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]), + Opts = [ + {owner, self()}, + {clientid, ClientId}, + {host, binary_to_list(Host)}, + {port, Port}, + {tcp_opts, []}, + {username, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keepalive, 86400}, + {auto_ack, true}, + {connect_timeout, 5000}, + {proto_ver, v5}, + {retry_interval, 5000} + ], + + {ok, ConnPid} = emqtt:start_link(Opts), + lager:debug("[mqtt_postman] start connect, options: ~p", [Opts]), + case emqtt:connect(ConnPid, 5000) of + {ok, _} -> + lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]), + NBuffer = endpoint_buffer:trigger_n(Buffer), + {noreply, State#state{conn_pid = ConnPid, buffer = NBuffer, status = ?CONNECTED}}; + {error, Reason} -> + lager:warning("[mqtt_postman] connect get error: ~p", [Reason]), + erlang:start_timer(5000, self(), create_postman), + {noreply, State} + end; + +%% 离线时,忽略数据发送逻辑 +handle_info({next_data, _Id, _LocationCode, _Message}, State = #state{status = ?DISCONNECTED}) -> + {keep_state, State}; +%% 发送数据到mqtt服务器 +handle_info({next_data, Id, LocationCode, Message}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) -> + Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), + lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]), + case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of + ok -> + NBuffer = endpoint_buffer:ack(Id, Buffer), + {noreply, State#state{buffer = NBuffer}}; + {ok, PacketId} -> + {noreply, State#state{inflight = maps:put(PacketId, Id, InFlight)}}; + {error, Reason} -> + lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]), + {stop, Reason, State} + end; + +handle_info({disconnected, ReasonCode, Properties}, State = #state{status = ?CONNECTED}) -> + lager:debug("[mqtt_postman] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + {noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}}; + +handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{status = ?CONNECTED}) -> + lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]), + {noreply, State}; + +%% 收到确认的消息 +handle_info({puback, #{packet_id := PacketId}}, State = #state{status = ?CONNECTED, inflight = Inflight, buffer = Buffer}) -> + case maps:take(PacketId, Inflight) of + {Id, RestInflight} -> + NBuffer = endpoint_buffer:ack(Id, Buffer), + {noreply, State#state{buffer = NBuffer, inflight = RestInflight}}; + error -> + {noreply, State} + end; + +%% postman进程挂掉时,重新建立新的 +handle_info({'EXIT', ConnPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, conn_pid = ConnPid}) -> + lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + {noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}}; + +handle_info(Info, State = #state{status = Status}) -> + lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]), + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) -> + lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]), + endpoint_buffer:cleanup(Buffer), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_mysql.erl b/apps/endpoint/src/endpoint_mysql.erl new file mode 100644 index 0000000..3baa49a --- /dev/null +++ b/apps/endpoint/src/endpoint_mysql.erl @@ -0,0 +1,186 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(endpoint_mysql). + +-include("endpoint.hrl"). +-behaviour(gen_server). + +%% API +-export([start_link/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-define(DISCONNECTED, disconnected). +-define(CONNECTED, connected). + +-record(state, { + endpoint :: #endpoint{}, + buffer :: endpoint_buffer:buffer(), + pool_pid :: undefined | pid(), + + status = ?DISCONNECTED +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) -> + gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([Endpoint]) -> + erlang:process_flag(trap_exit, true), + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_postman), + %% 初始化存储 + Buffer = endpoint_buffer:new(Endpoint, 10), + + {ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> + Stat = endpoint_buffer:stat(Buffer), + {reply, {ok, Stat}, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer), + {noreply, State#state{buffer = NBuffer}}; + +handle_cast(cleanup, State = #state{buffer = Buffer}) -> + endpoint_buffer:cleanup(Buffer), + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, create_postman}, State = #state{status = ?DISCONNECTED, buffer = Buffer, endpoint = #endpoint{title = Title, config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}}) -> + lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]), + WorkerArgs = [ + {host, binary_to_list(Host)}, + {port, Port}, + {user, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keep_alive, true}, + {database, binary_to_list(Database)}, + {queries, [<<"set names utf8">>]} + ], + + %% 启动工作的线程池 + case poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, mysql}], WorkerArgs) of + {ok, PoolPid} -> + NBuffer = endpoint_buffer:trigger_n(Buffer), + {noreply, State#state{pool_pid = PoolPid, buffer = NBuffer, status = ?CONNECTED}}; + ignore -> + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + {noreply, State}; + {error, Reason} -> + lager:warning("[mqtt_postman] start connect pool, get error: ~p", [Reason]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + {noreply, State} + end; + +%% 离线时,忽略数据发送逻辑 +handle_info({next_data, _Id, _LocationCode, _Message}, State = #state{status = ?DISCONNECTED}) -> + {noreply, State}; +%% 发送数据到mqtt服务器 +handle_info({next_data, Id, _LocationCode, Fields}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer, + endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table}}}) -> + + {ok, InsertSql, Values} = insert_sql(Table, Fields), + case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of + ok -> + NBuffer = endpoint_buffer:ack(Id, Buffer), + {noreply, State#state{buffer = NBuffer}}; + Error -> + lager:warning("[endpoint_mysql] endpoint: ~p, insert mysql get error: ~p", [Title, Error]), + {noreply, State} + end; + +%% postman进程挂掉时,重新建立新的 +handle_info({'EXIT', PoolPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, pool_pid = PoolPid}) -> + lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + {noreply, disconnected, State#state{pool_pid = undefined, status = ?DISCONNECTED}}; + +handle_info(Info, State = #state{status = Status}) -> + lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]), + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) -> + lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]), + endpoint_buffer:cleanup(Buffer), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec insert_sql(Table :: binary(), Fields :: list()) -> {ok, Sql :: binary(), Values :: list()}. +insert_sql(Table, Fields) when is_binary(Table), is_list(Fields) -> + {Keys, Values} = kvs(Fields), + + FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)), + Placeholders = lists:duplicate(length(Keys), <<"?">>), + ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)), + + {ok, <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>, Values}. + +-spec kvs(Fields :: list()) -> {Keys :: list(), Values :: list()}. +kvs(Fields) when is_list(Fields) -> + {Keys0, Values0} = lists:foldl(fun({K, V}, {Acc0, Acc1}) -> {[K|Acc0], [V|Acc1]} end, {[], []}, Fields), + {lists:reverse(Keys0), lists:reverse(Values0)}. \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_sup.erl b/apps/endpoint/src/endpoint_sup.erl index 8d8861b..2bf517e 100644 --- a/apps/endpoint/src/endpoint_sup.erl +++ b/apps/endpoint/src/endpoint_sup.erl @@ -6,8 +6,10 @@ -module(endpoint_sup). -behaviour(supervisor). +-include("endpoint.hrl"). -export([start_link/0]). +-export([ensured_endpoint_started/1, delete_endpoint/1]). -export([init/1]). @@ -26,10 +28,33 @@ start_link() -> %% type => worker(), % optional %% modules => modules()} % optional init([]) -> - SupFlags = #{strategy => one_for_all, - intensity => 0, - period => 1}, + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, ChildSpecs = [], {ok, {SupFlags, ChildSpecs}}. %% internal functions + +-spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +ensured_endpoint_started(Endpoint = #endpoint{}) -> + case supervisor:start_child(?MODULE, child_spec(Endpoint)) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {error, {'already_started', Pid}} when is_pid(Pid) -> + {ok, Pid}; + {error, Error} -> + {error, Error} + end. + +delete_endpoint(Id) when is_integer(Id) -> + Name = endpoint:get_name(Id), + supervisor:terminate_child(?MODULE, Name), + supervisor:delete_child(?MODULE, Name). + +child_spec(Endpoint = #endpoint{id = Id}) -> + Name = endpoint:get_name(Id), + #{id => Name, + start => {endpoint, start_link, [Name, Endpoint]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['endpoint']}. \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_timer.erl b/apps/endpoint/src/endpoint_timer.erl new file mode 100644 index 0000000..1244500 --- /dev/null +++ b/apps/endpoint/src/endpoint_timer.erl @@ -0,0 +1,128 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 07. 5月 2024 10:30 +%%%------------------------------------------------------------------- +-module(endpoint_timer). +-author("anlicheng"). +-include("endpoint.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/1]). +-export([task/3, ack/2, cleanup/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(state, { + retry_interval = 0, + %% 定时器 + timer_map = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +task(Pid, Id, Task) when is_pid(Pid), is_integer(Id), is_function(Task, 0) -> + gen_server:cast(Pid, {task, Id, Task}). + +ack(Pid, Id) when is_pid(Pid), is_integer(Id) -> + gen_server:cast(Pid, {ack, Id}). + +cleanup(Pid) when is_pid(Pid) -> + gen_server:cast(Pid, cleanup). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(RetryInterval :: integer()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(RetryInterval) when is_integer(RetryInterval) -> + gen_server:start_link(?MODULE, [RetryInterval], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([RetryInterval]) -> + {ok, #state{retry_interval = RetryInterval}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({task, Id, Task}, State = #state{retry_interval = RetryInterval, timer_map = TimerMap}) -> + TimerRef = erlang:start_timer(RetryInterval, self(), {repost_ticker, {Id, Task}}), + {noreply, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}; + +%% 取消 +handle_cast({ack, Id}, State = #state{timer_map = TimerMap}) -> + case maps:take(Id, TimerMap) of + error -> + {noreply, State}; + {TimerRef, NTimerMap} -> + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + {noreply, State#state{timer_map = NTimerMap}} + end; + +handle_cast(cleanup, State = #state{timer_map = TimerMap}) -> + lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), + {noreply, State#state{timer_map = #{}}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, {repost_ticker, {Id, Task}}}, State = #state{retry_interval = RetryInterval, timer_map = TimerMap}) -> + Task(), + TimerRef = erlang:start_timer(RetryInterval, self(), {repost_ticker, {Id, Task}}), + + {noreply, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%===================================================================