From 772d0fbb3f914bc9ee9ffa62c96f9b46f81de617 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 12 Jan 2024 17:31:53 +0800 Subject: [PATCH] fix zd --- apps/iot/src/endpoint/iot_zd_endpoint.erl | 55 +++++++---------------- 1 file changed, 15 insertions(+), 40 deletions(-) diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index eeeaefd..496f808 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -35,7 +35,9 @@ is_busy = false :: boolean(), %% 记录成功处理的消息数 - acc_num = 0 + acc_num = 0, + + queue = queue:new() }). %%%=================================================================== @@ -71,13 +73,11 @@ start_link() -> init([]) -> {ok, Opts} = application:get_env(iot, zhongdian), - erlang:process_flag(trap_exit, true), - %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 - erlang:start_timer(0, self(), create_postman), + {ok, PostmanPid} = create_postman(Opts), %% 启动日志记录器 - {ok, LoggerPid} = iot_logger:start_link("north_data"), + {ok, LoggerPid} = iot_logger:start_link("north_data_fix"), - {ok, disconnected, #state{mqtt_opts = Opts, postman_pid = undefined, logger_pid = LoggerPid}}. + {ok, connected, #state{mqtt_opts = Opts, postman_pid = PostmanPid, logger_pid = LoggerPid}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out @@ -91,31 +91,28 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) -> - mnesia_queue:insert(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}), +handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, queue = Q}) -> + Q1 = queue:in(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}, Q), %% 避免不必要的内部消息 Actions = case StateName =:= connected andalso not IsBusy of true -> [{next_event, info, fetch_next}]; false -> [] end, - {keep_state, State, Actions}; + {keep_state, State#state{queue = Q1}, Actions}; %% 触发读取下一条数据 -handle_event(info, fetch_next, disconnected, State) -> - lager:debug("[iot_zd_endpoint] fetch_next postman offline, data in queue"), - {keep_state, State}; handle_event(info, fetch_next, connected, State = #state{is_busy = true}) -> {keep_state, State}; -handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, cursor = Cursor}) -> - case mnesia_queue:dirty_fetch_next(Cursor) of - {ok, NCursor, NorthData} -> +handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, queue = Q}) -> + case queue:out(Q) of + {{value, NorthData}, Q1} -> lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [NorthData]), do_post(PostmanPid, NorthData), TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - {keep_state, State#state{cursor = NCursor, timer_ref = TimerRef, is_busy = true}}; - '$end_of_table' -> - {keep_state, State} + {keep_state, State#state{queue = Q1, timer_ref = TimerRef, is_busy = true}}; + {empty, Q1} -> + {keep_state, State#state{queue = Q1}} end; %% 收到确认消息 @@ -123,7 +120,6 @@ handle_event(info, {ack, Id, AssocMessage}, StateName, State = #state{timer_ref %% 记录日志信息 iot_logger:write(LoggerPid, AssocMessage), - ok = mnesia_queue:delete(Id), lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]), Actions = case StateName =:= connected of true -> [{next_event, info, fetch_next}]; @@ -145,35 +141,14 @@ handle_event(info, {timeout, _, {repost_ticker, NorthData}}, connected, State = handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> {keep_state, State}; -handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mqtt_opts = Opts}) -> - lager:debug("[iot_zd_endpoint] create postman"), - try - {ok, PostmanPid} = create_postman(Opts), - {next_state, connected, State#state{postman_pid = PostmanPid, timer_ref = undefined, is_busy = false}, [{next_event, info, fetch_next}]} - catch _:Error:Stack -> - lager:warning("[iot_zd_endpoint] config: ~p, create postman get error: ~p, stack: ~p", [Opts, Error, Stack]), - erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), - - {keep_state, State#state{postman_pid = undefined}} - end; - %% 获取当前统计信息 handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) -> Stat = #{ <<"acc_num">> => AccNum, - <<"queue_num">> => mnesia_queue:table_size(), <<"state_name">> => atom_to_binary(StateName) }, {keep_state, State, [{reply, From, Stat}]}; -%% postman进程挂掉时,重新建立新的 -handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{timer_ref = TimerRef, postman_pid = PostmanPid}) -> - lager:warning("[iot_zd_endpoint] postman exited with reason: ~p", [Reason]), - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), - - {next_state, disconnected, State#state{timer_ref = undefined, postman_pid = undefined}}; - %% @private %% @doc If callback_mode is handle_event_function, then whenever a %% gen_statem receives an event from call/2, cast/2, or as a normal