This commit is contained in:
anlicheng 2024-05-07 17:14:51 +08:00
parent f31ad3fb05
commit b9f16f3524
4 changed files with 133 additions and 190 deletions

View File

@ -41,14 +41,14 @@ get_pid(Id) when is_integer(Id) ->
forward(undefined, _, _, _) ->
ok;
forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
Pid ! {forward, LocationCode, Fields, Timestamp}.
gen_server:cast(Pid, {forward, LocationCode, Fields, Timestamp}).
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
gen_statem:cast(Pid, {reload, NEndpoint}).
-spec clean_up(Pid :: pid()) -> ok.
clean_up(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, clean_up, 5000).
gen_server:call(Pid, clean_up, 5000).
-spec config_equals(any(), any()) -> boolean().
config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) ->

View File

@ -14,7 +14,6 @@
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, forward/4, get_stat/1, cleanup/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -30,28 +29,6 @@
%%% API
%%%===================================================================
-spec get_name(Name :: binary() | #endpoint{}) -> atom().
get_name(Id) when is_integer(Id) ->
list_to_atom("endpoint:" ++ integer_to_list(Id)).
-spec get_pid(Id :: integer()) -> undefined | pid().
get_pid(Id) when is_integer(Id) ->
whereis(get_name(Id)).
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
forward(undefined, _, _, _) ->
ok;
forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
gen_server:cast(Pid, {forward, LocationCode, Fields, Timestamp}).
-spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}.
get_stat(Pid) when is_pid(Pid) ->
gen_server:call(Pid, get_stat, 5000).
-spec cleanup(Pid :: pid()) -> ok.
cleanup(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, cleanup).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), Endpoint :: #http_endpoint{}) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).

View File

@ -10,64 +10,39 @@
-module(endpoint_mqtt).
-include("endpoint.hrl").
-behaviour(gen_statem).
-behaviour(gen_server).
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, clean_up/1, get_mapper_fun/1]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%%
-define(RETRY_INTERVAL, 5000).
-define(DISCONNECTED, disconnected).
-define(CONNECTED, connected).
-record(state, {
endpoint :: #endpoint{},
buffer :: endpoint_buffer:buffer(),
conn_pid :: undefined | pid(),
%% , #{PacketId :: integer() => Id :: integer()}
inflight = #{}
inflight = #{},
status = disconnected
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_name(Id :: integer()) -> atom().
get_name(Id) when is_integer(Id) ->
list_to_atom("endpoint:" ++ integer_to_list(Id)).
-spec get_pid(Name :: binary()) -> undefined | pid().
get_pid(Name) when is_binary(Name) ->
whereis(get_name(Name)).
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
forward(undefined, _, _, _) ->
ok;
forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}).
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
gen_statem:cast(Pid, {reload, NEndpoint}).
-spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}.
get_stat(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_stat, 5000).
-spec clean_up(Pid :: pid()) -> ok.
clean_up(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, clean_up, 5000).
-spec get_mapper_fun(Pid :: pid()) -> fun().
get_mapper_fun(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_mapper_fun).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) ->
gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []).
gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []).
%%%===================================================================
%%% gen_statem callbacks
@ -84,25 +59,39 @@ init([Endpoint]) ->
%%
Buffer = endpoint_buffer:new(Endpoint, 10),
{ok, disconnected, #state{endpoint = Endpoint, buffer = Buffer}}.
{ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
Stat = endpoint_buffer:stat(Buffer),
{reply, {ok, Stat}, State}.
%% @private
%% @doc There should be one instance of this function for each possible
%% state name. If callback_mode is state_functions, one of these
%% functions is called when gen_statem receives and event from
%% call/2, cast/2, or as a normal process message.
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, _StateName, State = #state{buffer = Buffer}) ->
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer),
{keep_state, State#state{buffer = NBuffer}};
{noreply, State#state{buffer = NBuffer}}.
handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{buffer = Buffer,
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED,
endpoint = #endpoint{title = Title, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) ->
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]),
Opts = [
@ -126,89 +115,79 @@ handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{bu
{ok, _} ->
lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]),
NBuffer = endpoint_buffer:trigger_n(Buffer),
{next_state, connected, State#state{conn_pid = ConnPid, buffer = NBuffer}};
{noreply, State#state{conn_pid = ConnPid, buffer = NBuffer, status = ?CONNECTED}};
{error, Reason} ->
lager:warning("[mqtt_postman] connect get error: ~p", [Reason]),
erlang:start_timer(5000, self(), create_postman),
{keep_state, State}
{noreply, State}
end;
handle_event({call, From}, get_mapper_fun, _, State = #state{endpoint = #endpoint{mapper_fun = F}}) ->
{keep_state, State, [{reply, From, F}]};
%%
handle_event({call, From}, get_stat, _StateName, State = #state{buffer = Buffer}) ->
Stat = endpoint_buffer:stat(Buffer),
{keep_state, State, [{reply, From, Stat}]};
%% 线
handle_event(info, {next_data, _Id, _LocationCode, _Message}, disconnected, State) ->
handle_info({next_data, _Id, _LocationCode, _Message}, State = #state{status = ?DISCONNECTED}) ->
{keep_state, State};
%% mqtt服务器
handle_event(info, {next_data, Id, LocationCode, Message}, connected,
State = #state{conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
handle_info({next_data, Id, LocationCode, Message}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]),
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
ok ->
NBuffer = endpoint_buffer:ack(Id, Buffer),
{keep_state, State#state{buffer = NBuffer}};
{noreply, State#state{buffer = NBuffer}};
{ok, PacketId} ->
{keep_state, State#state{inflight = maps:put(PacketId, Id, InFlight)}};
{noreply, State#state{inflight = maps:put(PacketId, Id, InFlight)}};
{error, Reason} ->
lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]),
{stop, Reason, State}
end;
handle_event(info, {disconnected, ReasonCode, Properties}, connected, State) ->
handle_info({disconnected, ReasonCode, Properties}, State = #state{status = ?CONNECTED}) ->
lager:debug("[mqtt_postman] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{next_state, disconnected, State#state{conn_pid = undefined}};
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
handle_event(info, {publish, Message = #{packet_id := _PacketId, payload := Payload}}, connected, State) ->
handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{status = ?CONNECTED}) ->
lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]),
{keep_state, State};
{noreply, State};
%%
handle_event(info, {puback, #{packet_id := PacketId}}, connected, State = #state{inflight = Inflight, buffer = Buffer}) ->
handle_info({puback, #{packet_id := PacketId}}, State = #state{status = ?CONNECTED, inflight = Inflight, buffer = Buffer}) ->
case maps:take(PacketId, Inflight) of
{Id, RestInflight} ->
NBuffer = endpoint_buffer:ack(Id, Buffer),
{keep_state, State#state{buffer = NBuffer, inflight = RestInflight}};
{noreply, State#state{buffer = NBuffer, inflight = RestInflight}};
error ->
{keep_state, State}
{noreply, State}
end;
%% postman进程挂掉时
handle_event(info, {'EXIT', ConnPid, Reason}, connected, State = #state{endpoint = #endpoint{title = Title}, conn_pid = ConnPid}) ->
handle_info({'EXIT', ConnPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, conn_pid = ConnPid}) ->
lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{next_state, disconnected, State#state{conn_pid = undefined}};
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
handle_info(Info, State = #state{status = Status}) ->
lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]),
{noreply, State}.
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event(EventType, Event, StateName, State) ->
lager:warning("[iot_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]),
endpoint_buffer:cleanup(Buffer),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions

View File

@ -10,53 +10,32 @@
-module(endpoint_mysql).
-include("endpoint.hrl").
-behaviour(gen_statem).
-behaviour(gen_server).
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, cleanup/1]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%%
-define(RETRY_INTERVAL, 5000).
-define(DISCONNECTED, disconnected).
-define(CONNECTED, connected).
-record(state, {
endpoint :: #endpoint{},
buffer :: endpoint_buffer:buffer(),
pool_pid :: undefined | pid()
pool_pid :: undefined | pid(),
status = ?DISCONNECTED
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_name(Id :: integer()) -> atom().
get_name(Id) when is_integer(Id) ->
list_to_atom("endpoint:" ++ integer_to_list(Id)).
-spec get_pid(Name :: binary()) -> undefined | pid().
get_pid(Name) when is_binary(Name) ->
whereis(get_name(Name)).
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
forward(undefined, _, _, _) ->
ok;
forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}).
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
gen_statem:cast(Pid, {reload, NEndpoint}).
-spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}.
get_stat(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_stat, 5000).
-spec cleanup(Pid :: pid()) -> ok.
cleanup(Pid) when is_pid(Pid) ->
gen_statem:cast(Pid, cleanup).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
@ -78,30 +57,43 @@ init([Endpoint]) ->
%%
Buffer = endpoint_buffer:new(Endpoint, 10),
{ok, disconnected, #state{endpoint = Endpoint, buffer = Buffer}}.
{ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
Stat = endpoint_buffer:stat(Buffer),
{reply, {ok, Stat}, State}.
%% @private
%% @doc There should be one instance of this function for each possible
%% state name. If callback_mode is state_functions, one of these
%% functions is called when gen_statem receives and event from
%% call/2, cast/2, or as a normal process message.
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, _StateName, State = #state{buffer = Buffer}) ->
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer),
{keep_state, State#state{buffer = NBuffer}};
{noreply, State#state{buffer = NBuffer}};
handle_event(cast, cleanup, _StateName, State = #state{buffer = Buffer}) ->
handle_cast(cleanup, State = #state{buffer = Buffer}) ->
endpoint_buffer:cleanup(Buffer),
{keep_state, State};
{noreply, State}.
handle_event(info, {timeout, _, create_postman}, disconnected,
State = #state{buffer = Buffer, endpoint = #endpoint{title = Title, config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}}) ->
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_postman}, State = #state{status = ?DISCONNECTED, buffer = Buffer, endpoint = #endpoint{title = Title, config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}}) ->
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]),
WorkerArgs = [
{host, binary_to_list(Host)},
@ -117,67 +109,62 @@ handle_event(info, {timeout, _, create_postman}, disconnected,
case poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, mysql}], WorkerArgs) of
{ok, PoolPid} ->
NBuffer = endpoint_buffer:trigger_n(Buffer),
{next_state, connected, State#state{pool_pid = PoolPid, buffer = NBuffer}};
{noreply, State#state{pool_pid = PoolPid, buffer = NBuffer, status = ?CONNECTED}};
ignore ->
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{keep_state, State};
{noreply, State};
{error, Reason} ->
lager:warning("[mqtt_postman] start connect pool, get error: ~p", [Reason]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{keep_state, State}
{noreply, State}
end;
%%
handle_event({call, From}, get_stat, _StateName, State = #state{buffer = Buffer}) ->
Stat = endpoint_buffer:stat(Buffer),
{keep_state, State, [{reply, From, Stat}]};
%% 线
handle_event(info, {next_data, _Id, _LocationCode, _Message}, disconnected, State) ->
{keep_state, State};
handle_info({next_data, _Id, _LocationCode, _Message}, State = #state{status = ?DISCONNECTED}) ->
{noreply, State};
%% mqtt服务器
handle_event(info, {next_data, Id, _LocationCode, Fields}, connected,
State = #state{pool_pid = PoolPid, buffer = Buffer, endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table}}}) ->
handle_info({next_data, Id, _LocationCode, Fields}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer,
endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table}}}) ->
{ok, InsertSql, Values} = insert_sql(Table, Fields),
case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of
ok ->
NBuffer = endpoint_buffer:ack(Id, Buffer),
{keep_state, State#state{buffer = NBuffer}};
{noreply, State#state{buffer = NBuffer}};
Error ->
lager:warning("[endpoint_mysql] endpoint: ~p, insert mysql get error: ~p", [Title, Error]),
{keep_state, State}
{noreply, State}
end;
%% postman进程挂掉时
handle_event(info, {'EXIT', PoolPid, Reason}, connected, State = #state{endpoint = #endpoint{title = Title}, pool_pid = PoolPid}) ->
handle_info({'EXIT', PoolPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, pool_pid = PoolPid}) ->
lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{next_state, disconnected, State#state{pool_pid = undefined}};
{noreply, disconnected, State#state{pool_pid = undefined, status = ?DISCONNECTED}};
handle_info(Info, State = #state{status = Status}) ->
lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]),
{noreply, State}.
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event(EventType, Event, StateName, State) ->
lager:warning("[iot_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]),
endpoint_buffer:cleanup(Buffer),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions