简化中电数据缓存逻辑
This commit is contained in:
parent
43876e3bff
commit
40860f197a
@ -27,8 +27,9 @@
|
||||
postman_pid :: undefined | pid(),
|
||||
logger_pid :: pid(),
|
||||
|
||||
%% 当前数据的游标, #north_data的id
|
||||
cursor = 0 :: integer(),
|
||||
%% 数据缓存队列
|
||||
queue = queue:new(),
|
||||
|
||||
%% 定时器
|
||||
timer_ref :: undefined | reference(),
|
||||
%% 是否繁忙
|
||||
@ -77,7 +78,7 @@ init([]) ->
|
||||
%% 启动日志记录器
|
||||
{ok, LoggerPid} = iot_logger:start_link("north_data"),
|
||||
|
||||
{ok, disconnected, #state{mqtt_opts = Opts, postman_pid = undefined, logger_pid = LoggerPid}}.
|
||||
{ok, disconnected, #state{mqtt_opts = Opts, queue = queue:new(), postman_pid = undefined, logger_pid = LoggerPid}}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it needs to find out
|
||||
@ -91,14 +92,18 @@ callback_mode() ->
|
||||
%% functions is called when gen_statem receives and event from
|
||||
%% call/2, cast/2, or as a normal process message.
|
||||
|
||||
handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) ->
|
||||
mnesia_queue:insert(#north_data{location_code = LocationCode, dynamic_location_code = DynamicLocationCode, fields = Fields, timestamp = Timestamp}),
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case StateName =:= connected andalso not IsBusy of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
false -> []
|
||||
end,
|
||||
{keep_state, State, Actions};
|
||||
handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, queue = Q}) ->
|
||||
case format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) of
|
||||
{ok, Body} ->
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case StateName =:= connected andalso not IsBusy of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
false -> []
|
||||
end,
|
||||
{keep_state, State#state{queue = queue:in(Body, Q)}, Actions};
|
||||
error ->
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
%% 触发读取下一条数据
|
||||
handle_event(info, fetch_next, disconnected, State) ->
|
||||
@ -106,25 +111,23 @@ handle_event(info, fetch_next, disconnected, State) ->
|
||||
{keep_state, State};
|
||||
handle_event(info, fetch_next, connected, State = #state{is_busy = true}) ->
|
||||
{keep_state, State};
|
||||
handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, cursor = Cursor}) ->
|
||||
case mnesia_queue:dirty_fetch_next(Cursor) of
|
||||
{ok, NCursor, NorthData} ->
|
||||
lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [NorthData]),
|
||||
do_post(PostmanPid, NorthData),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, queue = Q}) ->
|
||||
case queue:out(Q) of
|
||||
{{value, Body}, Q1} ->
|
||||
lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [Body]),
|
||||
PostmanPid ! {post, self(), Body},
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}),
|
||||
|
||||
{keep_state, State#state{cursor = NCursor, timer_ref = TimerRef, is_busy = true}};
|
||||
'$end_of_table' ->
|
||||
{keep_state, State#state{queue = Q1, timer_ref = TimerRef, is_busy = true}};
|
||||
{empty, _} ->
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
%% 收到确认消息
|
||||
handle_event(info, {ack, Id, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) ->
|
||||
handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) ->
|
||||
%% 记录日志信息
|
||||
iot_logger:write(LoggerPid, AssocMessage),
|
||||
|
||||
ok = mnesia_queue:delete(Id),
|
||||
lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]),
|
||||
Actions = case StateName =:= connected of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
false -> []
|
||||
@ -134,12 +137,12 @@ handle_event(info, {ack, Id, AssocMessage}, StateName, State = #state{timer_ref
|
||||
{keep_state, State#state{timer_ref = undefined, acc_num = AccNum + 1, is_busy = false}, Actions};
|
||||
|
||||
%% 收到重发过期请求
|
||||
handle_event(info, {timeout, _, {repost_ticker, NorthData}}, connected, State = #state{postman_pid = PostmanPid}) ->
|
||||
lager:debug("[iot_zd_endpoint] repost data: ~p", [NorthData]),
|
||||
do_post(PostmanPid, NorthData),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
handle_event(info, {timeout, _, {repost_ticker, Body}}, connected, State = #state{postman_pid = PostmanPid}) ->
|
||||
lager:debug("[iot_zd_endpoint] repost data: ~p", [Body]),
|
||||
PostmanPid ! {post, self(), Body},
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}),
|
||||
|
||||
{keep_state, State#state{timer_ref = TimerRef}};
|
||||
{keep_state, State#state{timer_ref = TimerRef, is_busy = true}};
|
||||
|
||||
%% 离线时,忽略超时逻辑
|
||||
handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) ->
|
||||
@ -158,10 +161,10 @@ handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mq
|
||||
end;
|
||||
|
||||
%% 获取当前统计信息
|
||||
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
|
||||
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, queue = Q}) ->
|
||||
Stat = #{
|
||||
<<"acc_num">> => AccNum,
|
||||
<<"queue_num">> => mnesia_queue:table_size(),
|
||||
<<"queue_num">> => queue:len(Q),
|
||||
<<"state_name">> => atom_to_binary(StateName)
|
||||
},
|
||||
{keep_state, State, [{reply, From, Stat}]};
|
||||
@ -227,8 +230,8 @@ create_postman(Opts) ->
|
||||
|
||||
mqtt_postman:start_link(PostmanOpts, list_to_binary(Topic0), Qos).
|
||||
|
||||
-spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return().
|
||||
do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) ->
|
||||
-spec format_data(LocationCode :: binary(), DynamicLocationCode :: binary(), Fields :: list() | binary(), Timestamp :: integer()) -> {ok, Body :: binary()} | error.
|
||||
format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
|
||||
Data = #{
|
||||
<<"version">> => <<"1.0">>,
|
||||
<<"location_code">> => LocationCode,
|
||||
@ -238,7 +241,7 @@ do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, dynamic_l
|
||||
},
|
||||
try
|
||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||
PostmanPid ! {post, self(), {Id, Body}}
|
||||
{ok, Body}
|
||||
catch _:_ ->
|
||||
self() ! {ack, Id, <<"json error">>}
|
||||
error
|
||||
end.
|
||||
@ -74,7 +74,7 @@ start_mnesia() ->
|
||||
ok = mnesia:start(),
|
||||
Tables = mnesia:system_info(tables),
|
||||
|
||||
LoadTables = [id_generator, totalizator, 'queue_data:zhongdian'],
|
||||
LoadTables = [totalizator],
|
||||
case lists:all(fun(Tab) -> lists:member(Tab, Tables) end, LoadTables) of
|
||||
true ->
|
||||
lager:debug("[iot_app] waiting for mnesia start: ~p", [LoadTables]),
|
||||
@ -93,9 +93,7 @@ start_mnesia() ->
|
||||
|
||||
%% 创建数据库表
|
||||
%% id生成器
|
||||
mnesia_id_generator:create_table(),
|
||||
%% 数据转发缓存表
|
||||
mnesia_queue:create_table(),
|
||||
%% mnesia_id_generator:create_table(),
|
||||
%% 大数据统计表
|
||||
mnesia_totalizator:create_table()
|
||||
end.
|
||||
@ -1,61 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 26. 7月 2023 10:40
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(mnesia_queue).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
-define(TAB_NAME, 'queue_data:zhongdian').
|
||||
|
||||
%% API
|
||||
-export([create_table/0]).
|
||||
-export([insert/1, delete/1, table_size/0, dirty_fetch_next/1]).
|
||||
|
||||
create_table() ->
|
||||
%% 数据转发缓存表
|
||||
mnesia:create_table(?TAB_NAME, [
|
||||
{attributes, record_info(fields, north_data)},
|
||||
{record_name, north_data},
|
||||
{disc_copies, [node()]},
|
||||
{type, ordered_set}
|
||||
]).
|
||||
|
||||
-spec insert(#north_data{}) -> ok | {error, Reason :: any()}.
|
||||
insert(Item = #north_data{}) ->
|
||||
Id = mnesia_id_generator:next_id(?TAB_NAME),
|
||||
NItem = Item#north_data{id = Id},
|
||||
case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, NItem, write) end) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec delete(Key :: any()) -> ok | {error, Reason :: any()}.
|
||||
delete(Key) when is_integer(Key) ->
|
||||
case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, Key, write) end) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec table_size() -> integer().
|
||||
table_size() ->
|
||||
mnesia:table_info(?TAB_NAME, size).
|
||||
|
||||
-spec dirty_fetch_next(Cursor :: integer()) ->
|
||||
{ok, NCursor :: integer(), Item :: any()} | '$end_of_table'.
|
||||
dirty_fetch_next(Cursor) when is_integer(Cursor) ->
|
||||
case mnesia:dirty_next(?TAB_NAME, Cursor) of
|
||||
'$end_of_table' ->
|
||||
'$end_of_table';
|
||||
NextKey ->
|
||||
[Item] = mnesia:dirty_read(?TAB_NAME, NextKey),
|
||||
{ok, NextKey, Item}
|
||||
end.
|
||||
@ -89,22 +89,22 @@ handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}},
|
||||
{noreply, State};
|
||||
handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflight}) ->
|
||||
case maps:take(PacketId, Inflight) of
|
||||
{{Id, ReceiverPid, AssocMessage}, RestInflight} ->
|
||||
ReceiverPid ! {ack, Id, AssocMessage},
|
||||
{{ReceiverPid, AssocMessage}, RestInflight} ->
|
||||
ReceiverPid ! {ack, AssocMessage},
|
||||
{noreply, State#state{inflight = RestInflight}};
|
||||
error ->
|
||||
{noreply, State}
|
||||
end;
|
||||
|
||||
%% 转发信息
|
||||
handle_info({post, ReceiverPid, {Id, Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) ->
|
||||
handle_info({post, ReceiverPid, Message}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) ->
|
||||
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
||||
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
||||
ok ->
|
||||
ReceiverPid ! {ack, Id, Message},
|
||||
ReceiverPid ! {ack, Message},
|
||||
{noreply, State};
|
||||
{ok, PacketId} ->
|
||||
{noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}};
|
||||
{noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Message}, InFlight)}};
|
||||
{error, Reason} ->
|
||||
lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]),
|
||||
{stop, Reason, State}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user