From 40860f197a21491a1e5dc1cf73093e7b6b7f89bf Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 31 Jul 2024 16:10:01 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96=E4=B8=AD=E7=94=B5=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E7=BC=93=E5=AD=98=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/endpoint/iot_zd_endpoint.erl | 69 ++++++++++++----------- apps/iot/src/iot_app.erl | 6 +- apps/iot/src/mnesia/mnesia_queue.erl | 61 -------------------- apps/iot/src/postman/mqtt_postman.erl | 10 ++-- 4 files changed, 43 insertions(+), 103 deletions(-) delete mode 100644 apps/iot/src/mnesia/mnesia_queue.erl diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index 2126fee..254cb5f 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -27,8 +27,9 @@ postman_pid :: undefined | pid(), logger_pid :: pid(), - %% 当前数据的游标, #north_data的id - cursor = 0 :: integer(), + %% 数据缓存队列 + queue = queue:new(), + %% 定时器 timer_ref :: undefined | reference(), %% 是否繁忙 @@ -77,7 +78,7 @@ init([]) -> %% 启动日志记录器 {ok, LoggerPid} = iot_logger:start_link("north_data"), - {ok, disconnected, #state{mqtt_opts = Opts, postman_pid = undefined, logger_pid = LoggerPid}}. + {ok, disconnected, #state{mqtt_opts = Opts, queue = queue:new(), postman_pid = undefined, logger_pid = LoggerPid}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out @@ -91,14 +92,18 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) -> - mnesia_queue:insert(#north_data{location_code = LocationCode, dynamic_location_code = DynamicLocationCode, fields = Fields, timestamp = Timestamp}), - %% 避免不必要的内部消息 - Actions = case StateName =:= connected andalso not IsBusy of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State, Actions}; +handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, queue = Q}) -> + case format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) of + {ok, Body} -> + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso not IsBusy of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State#state{queue = queue:in(Body, Q)}, Actions}; + error -> + {keep_state, State} + end; %% 触发读取下一条数据 handle_event(info, fetch_next, disconnected, State) -> @@ -106,25 +111,23 @@ handle_event(info, fetch_next, disconnected, State) -> {keep_state, State}; handle_event(info, fetch_next, connected, State = #state{is_busy = true}) -> {keep_state, State}; -handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, cursor = Cursor}) -> - case mnesia_queue:dirty_fetch_next(Cursor) of - {ok, NCursor, NorthData} -> - lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [NorthData]), - do_post(PostmanPid, NorthData), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), +handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, queue = Q}) -> + case queue:out(Q) of + {{value, Body}, Q1} -> + lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [Body]), + PostmanPid ! {post, self(), Body}, + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}), - {keep_state, State#state{cursor = NCursor, timer_ref = TimerRef, is_busy = true}}; - '$end_of_table' -> + {keep_state, State#state{queue = Q1, timer_ref = TimerRef, is_busy = true}}; + {empty, _} -> {keep_state, State} end; %% 收到确认消息 -handle_event(info, {ack, Id, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) -> +handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) -> %% 记录日志信息 iot_logger:write(LoggerPid, AssocMessage), - ok = mnesia_queue:delete(Id), - lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]), Actions = case StateName =:= connected of true -> [{next_event, info, fetch_next}]; false -> [] @@ -134,12 +137,12 @@ handle_event(info, {ack, Id, AssocMessage}, StateName, State = #state{timer_ref {keep_state, State#state{timer_ref = undefined, acc_num = AccNum + 1, is_busy = false}, Actions}; %% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, NorthData}}, connected, State = #state{postman_pid = PostmanPid}) -> - lager:debug("[iot_zd_endpoint] repost data: ~p", [NorthData]), - do_post(PostmanPid, NorthData), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), +handle_event(info, {timeout, _, {repost_ticker, Body}}, connected, State = #state{postman_pid = PostmanPid}) -> + lager:debug("[iot_zd_endpoint] repost data: ~p", [Body]), + PostmanPid ! {post, self(), Body}, + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}), - {keep_state, State#state{timer_ref = TimerRef}}; + {keep_state, State#state{timer_ref = TimerRef, is_busy = true}}; %% 离线时,忽略超时逻辑 handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> @@ -158,10 +161,10 @@ handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mq end; %% 获取当前统计信息 -handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) -> +handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, queue = Q}) -> Stat = #{ <<"acc_num">> => AccNum, - <<"queue_num">> => mnesia_queue:table_size(), + <<"queue_num">> => queue:len(Q), <<"state_name">> => atom_to_binary(StateName) }, {keep_state, State, [{reply, From, Stat}]}; @@ -227,8 +230,8 @@ create_postman(Opts) -> mqtt_postman:start_link(PostmanOpts, list_to_binary(Topic0), Qos). --spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return(). -do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> +-spec format_data(LocationCode :: binary(), DynamicLocationCode :: binary(), Fields :: list() | binary(), Timestamp :: integer()) -> {ok, Body :: binary()} | error. +format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> Data = #{ <<"version">> => <<"1.0">>, <<"location_code">> => LocationCode, @@ -238,7 +241,7 @@ do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, dynamic_l }, try Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), {Id, Body}} + {ok, Body} catch _:_ -> - self() ! {ack, Id, <<"json error">>} + error end. \ No newline at end of file diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 323d913..edafae9 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -74,7 +74,7 @@ start_mnesia() -> ok = mnesia:start(), Tables = mnesia:system_info(tables), - LoadTables = [id_generator, totalizator, 'queue_data:zhongdian'], + LoadTables = [totalizator], case lists:all(fun(Tab) -> lists:member(Tab, Tables) end, LoadTables) of true -> lager:debug("[iot_app] waiting for mnesia start: ~p", [LoadTables]), @@ -93,9 +93,7 @@ start_mnesia() -> %% 创建数据库表 %% id生成器 - mnesia_id_generator:create_table(), - %% 数据转发缓存表 - mnesia_queue:create_table(), + %% mnesia_id_generator:create_table(), %% 大数据统计表 mnesia_totalizator:create_table() end. \ No newline at end of file diff --git a/apps/iot/src/mnesia/mnesia_queue.erl b/apps/iot/src/mnesia/mnesia_queue.erl deleted file mode 100644 index bae59eb..0000000 --- a/apps/iot/src/mnesia/mnesia_queue.erl +++ /dev/null @@ -1,61 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 26. 7月 2023 10:40 -%%%------------------------------------------------------------------- --module(mnesia_queue). --author("aresei"). --include("iot.hrl"). - --define(TAB_NAME, 'queue_data:zhongdian'). - -%% API --export([create_table/0]). --export([insert/1, delete/1, table_size/0, dirty_fetch_next/1]). - -create_table() -> - %% 数据转发缓存表 - mnesia:create_table(?TAB_NAME, [ - {attributes, record_info(fields, north_data)}, - {record_name, north_data}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - --spec insert(#north_data{}) -> ok | {error, Reason :: any()}. -insert(Item = #north_data{}) -> - Id = mnesia_id_generator:next_id(?TAB_NAME), - NItem = Item#north_data{id = Id}, - case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, NItem, write) end) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. - --spec delete(Key :: any()) -> ok | {error, Reason :: any()}. -delete(Key) when is_integer(Key) -> - case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, Key, write) end) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. - --spec table_size() -> integer(). -table_size() -> - mnesia:table_info(?TAB_NAME, size). - --spec dirty_fetch_next(Cursor :: integer()) -> - {ok, NCursor :: integer(), Item :: any()} | '$end_of_table'. -dirty_fetch_next(Cursor) when is_integer(Cursor) -> - case mnesia:dirty_next(?TAB_NAME, Cursor) of - '$end_of_table' -> - '$end_of_table'; - NextKey -> - [Item] = mnesia:dirty_read(?TAB_NAME, NextKey), - {ok, NextKey, Item} - end. \ No newline at end of file diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index d24f31e..4ca352f 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -89,22 +89,22 @@ handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, {noreply, State}; handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflight}) -> case maps:take(PacketId, Inflight) of - {{Id, ReceiverPid, AssocMessage}, RestInflight} -> - ReceiverPid ! {ack, Id, AssocMessage}, + {{ReceiverPid, AssocMessage}, RestInflight} -> + ReceiverPid ! {ack, AssocMessage}, {noreply, State#state{inflight = RestInflight}}; error -> {noreply, State} end; %% 转发信息 -handle_info({post, ReceiverPid, {Id, Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) -> +handle_info({post, ReceiverPid, Message}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) -> lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of ok -> - ReceiverPid ! {ack, Id, Message}, + ReceiverPid ! {ack, Message}, {noreply, State}; {ok, PacketId} -> - {noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}}; + {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Message}, InFlight)}}; {error, Reason} -> lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]), {stop, Reason, State}