endpoint queue

This commit is contained in:
anlicheng 2023-07-26 18:11:47 +08:00
parent 955869d4e8
commit 0a229bbbb4
7 changed files with 208 additions and 96 deletions

View File

@ -95,7 +95,7 @@
%%
-record(north_data, {
ref :: reference(),
id :: integer(),
location_code :: binary(),
body :: binary()
}).

View File

@ -79,7 +79,13 @@ handle_request("POST", "/endpoint/update", _, Params = #{<<"name">> := Name}) wh
handle_request("POST", "/endpoint/delete", _, #{<<"name">> := Name}) when is_binary(Name) ->
case mnesia_endpoint:delete(Name) of
ok ->
iot_endpoint_sup:delete_endpoint(Name),
case iot_endpoint:get_pid(Name) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
Pid ->
iot_endpoint:clean_up(Pid),
iot_endpoint_sup:delete_endpoint(Name)
end,
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->

View File

@ -14,7 +14,7 @@
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, forward/3, get_stat/1, reload/2]).
-export([get_name/1, get_pid/1, forward/3, get_stat/1, reload/2, clean_up/1]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
@ -24,15 +24,19 @@
-record(state, {
endpoint :: #endpoint{},
postman :: undefined | {atom(), pid()},
%%
ack_map = #{},
postman_pid :: undefined | pid(),
%% ,
tab_name :: atom(),
%% #north_data的id
cursor = 0 :: integer(),
%%
timer_map = #{},
%%
window_size = 10,
%%
flight_num = 0,
%%
acc_num = 0,
%% postman进程异常时
q = queue:new()
acc_num = 0
}).
%%%===================================================================
@ -62,6 +66,10 @@ reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
get_stat(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_stat, 5000).
-spec clean_up(Pid :: pid()) -> ok.
clean_up(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, clean_up, 5000).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
@ -76,12 +84,17 @@ start_link(Name, Endpoint = #endpoint{}) ->
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Endpoint]) ->
init([Endpoint = #endpoint{name = Name}]) ->
erlang:process_flag(trap_exit, true),
%%
erlang:start_timer(0, self(), recreate_postman),
{ok, disconnected, #state{endpoint = Endpoint, postman = undefined}}.
%% ,
erlang:start_timer(0, self(), create_postman),
%%
TabName = binary_to_atom(<<"queue_data:", Name/binary>>),
%%
mnesia_queue:ensure_queue(TabName),
{ok, disconnected, #state{endpoint = Endpoint, tab_name = TabName, postman_pid = undefined}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
@ -97,11 +110,11 @@ callback_mode() ->
%%
handle_event(cast, {reload, NEndpoint}, disconnected, State = #state{endpoint = Endpoint}) ->
lager:warning("[iot_endpoint] reload endpoint, old: ~p, new: ~p", [Endpoint, Endpoint, NEndpoint]),
lager:warning("[iot_endpoint] state_name: disconnected, reload endpoint, old: ~p, new: ~p", [Endpoint, Endpoint, NEndpoint]),
{keep_state, State#state{endpoint = NEndpoint}};
handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, State = #state{endpoint = Endpoint, postman = {_, PostmanPid}}) ->
lager:debug("[iot_endpoint] reload endpoint, old: ~p~n, new: ~p", [Endpoint, NEndpoint]),
handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, State = #state{endpoint = Endpoint, timer_map = TimerMap, postman_pid = PostmanPid}) ->
lager:debug("[iot_endpoint] state_name: connected, reload endpoint, old: ~p, new: ~p", [Endpoint, NEndpoint]),
case mnesia_endpoint:config_equals(NEndpoint#endpoint.config, Endpoint#endpoint.config) of
true ->
lager:debug("[iot_endpoint] reload endpoint: ~p, config equals", [Name]),
@ -110,99 +123,112 @@ handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, Stat
%% postman的link关系
unlink(PostmanPid),
%% postman进程
PostmanPid ! stop,
catch PostmanPid ! stop,
%%
NState = stash(State),
lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)),
%% postman
erlang:start_timer(0, self(), recreate_postman),
erlang:start_timer(0, self(), create_postman),
{next_state, disconnected, NState#state{endpoint = NEndpoint, postman = undefined}}
{next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}}
end;
handle_event(cast, {forward, LocationCode, Data}, _, State = #state{endpoint = Endpoint = #endpoint{mapper_fun = MapperFun}}) ->
handle_event(cast, {forward, LocationCode, Data}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) ->
try
Body = MapperFun(LocationCode, Data),
NorthData = #north_data{ref = make_ref(), location_code = LocationCode, body = Body},
gen_statem:cast(self(), {post, NorthData})
mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}),
%%
Actions = case StateName =:= connected andalso FlightNum < WindowSize of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State, Actions}
catch _:Reason ->
lager:debug("[iot_endpoint] endpoint: ~p, mapper data get error: ~p", [Endpoint, Reason])
end,
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper data get error: ~p", [Name, Reason]),
{keep_state, State}
end;
%%
handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint{name = Name}}) ->
lager:debug("[iot_endpoint] fetch_next endpoint: ~p, postman offline, data in queue", [Name]),
{keep_state, State};
handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize ->
{keep_state, State};
handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) ->
case catch mnesia_queue:dirty_fetch_next(TabName, Cursor) of
{ok, NCursor, NorthData = #north_data{id = Id}} ->
lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
handle_event(cast, {post, NorthData = #north_data{ref = Ref}}, connected, State = #state{endpoint = Endpoint, postman = {_, PostmanPid}, ack_map = AckMap, timer_map = TimerMap}) ->
lager:debug("[iot_endpoint] endpoint: ~p, postman online, north data is: ~p", [Endpoint, NorthData]),
PostmanPid ! {post, NorthData},
%%
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
PostmanPid ! {post, NorthData},
%%
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
{keep_state, State#state{ack_map = maps:put(Ref, NorthData, AckMap), timer_map = maps:put(Ref, TimerRef, TimerMap)}};
handle_event(cast, {post, NorthData}, disconnected, State = #state{endpoint = Endpoint, q = Q}) ->
lager:debug("[iot_endpoint] endpoint: ~p, postman offline, data in queue", [Endpoint]),
{keep_state, State#state{q = queue:in(NorthData, Q)}};
{keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}};
'$end_of_table' ->
lager:debug("[iot_endpoint] endpoint:~p, fetch_next failed $end_of_table", [Name]),
{keep_state, State};
Error ->
lager:debug("[iot_endpoint] endpoint:~p, fetch_next get error: ~p", [Name, Error]),
{keep_state, State}
end;
%%
handle_event(info, {ack, Ref}, _, State = #state{ack_map = AckMap, timer_map = TimerMap, acc_num = AccNum}) ->
NAckMap = maps:remove(Ref, AckMap),
NTimerMap = case maps:take(Ref, TimerMap) of
error ->
TimerMap;
{TimerRef, TimerMap0} ->
catch erlang:cancel_timer(TimerRef),
TimerMap0
end,
{keep_state, State#state{ack_map = NAckMap, timer_map = NTimerMap, acc_num = AccNum + 1}};
handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name}, timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) ->
lager:debug("[iot_endpoint] endpoint: ~p, get ack: ~p", [Name, Id]),
mnesia_queue:delete(TabName, Id),
Actions = case StateName =:= connected of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions};
%%
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{ref = Ref}}}, connected, State = #state{postman = {_, PostmanPid}, timer_map = TimerMap}) ->
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap}) ->
lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]),
PostmanPid ! {post, NorthData},
%% 5
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
{keep_state, State#state{timer_map = maps:put(Ref, TimerRef, TimerMap)}};
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
%% 线
handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) ->
{keep_state, State};
handle_event(info, {timeout, _, recreate_postman}, disconnected, State = #state{endpoint = Endpoint, ack_map = AckMap, timer_map = TimerMap, q = Q}) ->
lager:debug("[iot_endpoint] recreate postman: ~p", [Endpoint]),
try create_postman(Endpoint) of
{ok, Postman = {_, PostmanPid}} ->
lager:debug("[iot_endpoint] queue data is: ~p", [queue:to_list(Q)]),
%%
{NAckMap, NTimerMap} = lists:foldl(fun(NorthData = #north_data{ref = Ref}, {AckMap0, TimerMap0}) ->
PostmanPid ! {post, NorthData},
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
{maps:put(Ref, NorthData, AckMap0), maps:put(Ref, TimerRef, TimerMap0)}
end, {AckMap, TimerMap}, queue:to_list(Q)),
%%
{next_state, connected, State#state{endpoint = Endpoint, postman = Postman, ack_map = NAckMap, timer_map = NTimerMap, q = queue:new()}}
catch _:Reason ->
lager:warning("[iot_endpoint] recreate postman get error: ~p", [Reason]),
erlang:start_timer(?RETRY_INTERVAL, self(), recreate_postman),
handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{endpoint = Endpoint = #endpoint{name = Name}, window_size = WindowSize}) ->
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]),
try
{ok, PostmanPid} = create_postman(Endpoint),
%% window_size
Actions = lists:map(fun(_) -> {next_event, info, fetch_next} end, lists:seq(1, WindowSize)),
{next_state, connected, State#state{endpoint = Endpoint, postman_pid = PostmanPid, timer_map = maps:new(), flight_num = 0}, Actions}
catch _:Error ->
lager:warning("[iot_endpoint] endpoint: ~p, create postman get error: ~p", [Name, Error]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{keep_state, State#state{endpoint = Endpoint, postman = undefined}}
{keep_state, State#state{endpoint = Endpoint, postman_pid = undefined}}
end;
%%
handle_event({call, From}, clean_up, _, State = #state{tab_name = TabName}) ->
mnesia:delete_table(TabName),
{keep_state, State, [{reply, From, ok}]};
%%
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, ack_map = AckMap, q = Q}) ->
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, tab_name = TabName}) ->
Stat = #{
<<"acc_num">> => AccNum,
<<"unconfirmed_num">> => maps:size(AckMap),
<<"queue_num">> => queue:len(Q),
<<"state">> => atom_to_binary(StateName)
<<"queue_num">> => mnesia_queue:table_size(TabName),
<<"state_name">> => atom_to_binary(StateName)
},
{keep_state, State, [{reply, From, Stat}]};
%% ,
handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{ack_map = AckMap, postman = {_, PostmanPid}}) ->
lager:warning("[iot_endpoint] postman exited, current ack_map: ~p, reason: ~p", [AckMap, Reason]),
NState = stash(State),
handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap, postman_pid = PostmanPid}) ->
lager:warning("[iot_endpoint] endpoint: ~p, postman exited with reason: ~p", [Name, Reason]),
lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
erlang:start_timer(?RETRY_INTERVAL, self(), recreate_postman),
{next_state, disconnected, NState#state{postman = undefined}};
{next_state, disconnected, State#state{timer_map = maps:new(), postman_pid = undefined}};
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
@ -229,20 +255,20 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
%%
stash(State = #state{ack_map = AckMap, timer_map = TimerMap, q = Q}) ->
Q1 = lists:foldl(fun({_, NorthData}, Q0) -> queue:in(NorthData, Q0) end, Q, maps:to_list(AckMap)),
%% timer
lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)),
State#state{q = Q1, ack_map = #{}, postman = undefined}.
-spec remove_timer(Id :: integer(), TimerMap :: #{}) -> NTimerMap :: #{}.
remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) ->
case maps:take(Id, TimerMap) of
error ->
TimerMap;
{TimerRef, TimerMap0} ->
catch erlang:cancel_timer(TimerRef),
TimerMap0
end.
%% http和https协议的支持
create_postman(#endpoint{name = Name, config = #http_endpoint{url = Url, pool_size = PoolSize}}) ->
PoolName = binary_to_atom(<<"http_pool:", Name/binary>>),
{ok, PostmanPid} = http_postman:start_link(self(), Url, PoolName, PoolSize),
{ok, {http, PostmanPid}};
http_postman:start_link(self(), Url, PoolName, PoolSize);
%% mqtt协议的支持
create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) ->
@ -262,8 +288,6 @@ create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port
{retry_interval, 5}
],
{ok, PostmanPid} = mqtt_postman:start_link(self(), Opts, Topic, Qos),
{ok, {mqtt, PostmanPid}};
mqtt_postman:start_link(self(), Opts, Topic, Qos);
create_postman(#endpoint{}) ->
throw(<<"not supported">>).

View File

@ -0,0 +1,64 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 26. 7 2023 10:40
%%%-------------------------------------------------------------------
-module(mnesia_queue).
-author("aresei").
-include("iot.hrl").
%% API
-export([insert/2, delete/2, ensure_queue/1, table_size/1, dirty_fetch_next/2]).
-spec insert(Tab :: atom(), #north_data{}) -> ok | {error, Reason :: any()}.
insert(Tab, Item = #north_data{}) ->
Id = mnesia_id_generator:next_id(Tab),
NItem = Item#north_data{id = Id},
case mnesia:transaction(fun() -> mnesia:write(Tab, NItem, write) end) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
{error, Reason}
end.
-spec delete(Tab :: atom(), Key :: any()) -> ok | {error, Reason :: any()}.
delete(Tab, Key) when is_atom(Tab) ->
case mnesia:transaction(fun() -> mnesia:delete(Tab, Key, write) end) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
{error, Reason}
end.
%%
-spec ensure_queue(Name :: atom()) -> no_return().
ensure_queue(Name) when is_atom(Name) ->
Tables = mnesia:system_info(tables),
case lists:member(Name, Tables) of
true ->
mnesia:wait_for_tables([Name], infinity);
false ->
mnesia:create_table(Name, [
{attributes, record_info(fields, north_data)},
{record_name, north_data},
{disc_copies, [node()]},
{type, ordered_set}
])
end.
-spec table_size(Tab :: atom()) -> integer().
table_size(Tab) when is_atom(Tab) ->
mnesia:table_info(Tab, size).
-spec dirty_fetch_next(Tab :: atom(), Cursor :: integer()) -> {ok, NCursor :: integer(), Item :: any()} | '$end_of_table'.
dirty_fetch_next(Tab, Cursor) when is_atom(Tab), is_integer(Cursor) ->
case mnesia:dirty_next(Tab, Cursor) of
'$end_of_table' ->
'$end_of_table';
NextKey ->
[Item] = mnesia:dirty_read(Tab, NextKey),
{ok, NextKey, Item}
end.

View File

@ -13,11 +13,29 @@
%% API
-export([rsa_encode/1]).
-export([insert_services/1]).
-export([insert_endpoints/0]).
-export([insert_endpoints/0, forward/0]).
forward() ->
Name = <<"zhongguodianli">>,
Pid = iot_endpoint:get_pid(Name),
lists:foreach(fun(Id0) ->
Id = integer_to_binary(Id0),
Fields = [
#{
<<"key">> => <<"test:", Id/binary>>,
<<"value">> => Id,
<<"unit">> => <<"cm">>
}
],
iot_endpoint:forward(Pid, <<"location_code:", Id/binary>>, Fields)
end, lists:seq(1, 10000)).
insert_endpoints() ->
Mapper0 = "fun(LocationCode, Fields) ->
Bin = jiffy:encode(Fields#{<<\"location_code\">> => LocationCode}, [force_utf8]),
Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields),
Fields2 = maps:from_list(Fields1),
Bin = jiffy:encode(Fields2#{<<\"location_code\">> => LocationCode}, [force_utf8]),
iolist_to_binary(Bin)
end.",

View File

@ -87,11 +87,11 @@ handle_info(stop, State = #state{pool_name = PoolName, worker_pool_pid = WorkerP
{stop, normal, State};
handle_info({post, #north_data{body = Body, ref = Ref}}, State = #state{parent_pid = ParentPid, url = Url, pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) ->
handle_info({post, #north_data{body = Body, id = Id}}, State = #state{parent_pid = ParentPid, url = Url, pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) ->
poolboy:transaction(WorkerPoolPid, fun(Pid) ->
case http_postman_worker:post(Pid, Url, Body, PoolName) of
ok ->
ParentPid ! {ack, Ref};
ParentPid ! {ack, Id};
{error, Reason} ->
lager:debug("[http_postman] post url: ~p, body: ~p, get error: ~p", [Url, Body, Reason])
end

View File

@ -90,9 +90,9 @@ handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}},
{noreply, State};
handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_pid = ParentPid, inflight = Inflight}) ->
case maps:take(PacketId, Inflight) of
{{Ref, Message}, RestInflight} ->
{{Id, Message}, RestInflight} ->
lager:debug("[mqtt_postman] receive puback packet: ~p, assoc message: ~p", [Packet, Message]),
ParentPid ! {ack, Ref},
ParentPid ! {ack, Id},
{noreply, State#state{inflight = RestInflight}};
error ->
lager:warning("[mqtt_postman] receive unknown puback packet: ~p", [Packet]),
@ -100,16 +100,16 @@ handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_p
end;
%%
handle_info({post, #north_data{ref = Ref, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
handle_info({post, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]),
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
ok ->
ParentPid ! {ack, Ref},
ParentPid ! {ack, Id},
{noreply, State};
{ok, PacketId} ->
lager:debug("[mqtt_postman] send success, packet_id: ~p", [PacketId]),
{noreply, State#state{inflight = maps:put(PacketId, {Ref, Message}, InFlight)}};
{noreply, State#state{inflight = maps:put(PacketId, {Id, Message}, InFlight)}};
{error, Reason} ->
lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]),
{stop, Reason, State}