From 0cc1e3e542c3f3a888a219017bddeef0b5616120 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Thu, 10 Aug 2023 15:48:03 +0800 Subject: [PATCH] fix endpoint --- apps/iot/src/iot_endpoint.erl | 46 ++++++++++++++-------------- apps/iot/src/mnesia/mnesia_queue.erl | 3 +- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index 34aa862..f27c729 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -100,7 +100,7 @@ init([Endpoint = #endpoint{name = Name}]) -> {ok, disconnected, #state{endpoint = Endpoint, tab_name = TabName, postman_pid = undefined}} catch _:Error -> - lager:warning("[iot_endpoint] endpoint: ~p, init get error: ~p", [Name, Error]), + lager:warning("[iot_endpoint] endpoint: ~p, init get error: ~p, ignore", [Name, Error]), ignore end. @@ -156,33 +156,27 @@ handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize -> {keep_state, State}; handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) -> - case catch mnesia_queue:dirty_fetch_next(TabName, Cursor) of - {ok, NCursor, NorthData = #north_data{id = Id, location_code = LocationCode}} -> + case mnesia_queue:dirty_fetch_next(TabName, Cursor) of + {ok, NCursor, NorthData = #north_data{id = Id}} -> lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), case safe_invoke_mapper(MapperFun, NorthData) of {ok, Body} -> - PostData = #post_data{id = Id, body = Body, location_code = LocationCode}, - - PostmanPid ! {post, self(), PostData}, + PostmanPid ! {post, self(), make_post_data(NorthData, Body)}, %% 重发机制, 在发送的过程中mapper可能会改变 TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), {keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}}; {error, Error} -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p", [Name, Error]), + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]), mnesia_queue:delete(TabName, Id), {keep_state, State}; ignore -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore", [Name]), + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]), mnesia_queue:delete(TabName, Id), {keep_state, State} end; '$end_of_table' -> - lager:debug("[iot_endpoint] endpoint:~p, fetch_next failed $end_of_table", [Name]), - {keep_state, State}; - Error -> - lager:debug("[iot_endpoint] endpoint:~p, fetch_next get error: ~p", [Name, Error]), {keep_state, State} end; @@ -197,11 +191,11 @@ handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endp {keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions}; %% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id, location_code = LocationCode}}}, connected, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap}) -> +handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap}) -> lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]), case safe_invoke_mapper(MapperFun, NorthData) of {ok, Body} -> - PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}, + PostmanPid ! {post, self(), make_post_data(NorthData, Body)}, %% 重发机制, 在发送的过程中mapper可能会改变 TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), @@ -252,7 +246,7 @@ handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, }, {keep_state, State, [{reply, From, Stat}]}; -%% 所有未确认的消息进入队列里面, 这里不保证消息的顺序 +%% postman进程挂掉时,重新建立新的 handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap, postman_pid = PostmanPid}) -> lager:warning("[iot_endpoint] endpoint: ~p, postman exited with reason: ~p", [Name, Reason]), lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), @@ -273,8 +267,8 @@ handle_event(EventType, Event, StateName, State) -> %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_statem terminates with %% Reason. The return value is ignored. -terminate(Reason, _StateName, _State) -> - lager:debug("[iot_endpoint] terminate with reason: ~p", [Reason]), +terminate(Reason, _StateName, #state{endpoint = #endpoint{name = Name}}) -> + lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Name, Reason]), ok. %% @private @@ -291,9 +285,9 @@ remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) -> case maps:take(Id, TimerMap) of error -> TimerMap; - {TimerRef, TimerMap0} -> - catch erlang:cancel_timer(TimerRef), - TimerMap0 + {TimerRef, NTimerMap} -> + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + NTimerMap end. %% 对http和https协议的支持 @@ -310,8 +304,8 @@ create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port {host, binary_to_list(Host)}, {port, Port}, {tcp_opts, []}, - {username, unicode:characters_to_list(Username)}, - {password, unicode:characters_to_list(Password)}, + {username, binary_to_list(Username)}, + {password, binary_to_list(Password)}, {keepalive, 86400}, {auto_ack, true}, {connect_timeout, 5000}, @@ -339,6 +333,8 @@ create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, user create_postman(#endpoint{}) -> throw(<<"not supported">>). +-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) -> + {ok, Body :: any()} | ignore | {error, Reason :: any()}. safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) -> try if @@ -349,4 +345,8 @@ safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = end catch _:Error -> {error, Error} - end. \ No newline at end of file + end. + +-spec make_post_data(NorthData :: #north_data{}, Body :: any()) -> PostData :: #post_data{}. +make_post_data(#north_data{id = Id, location_code = LocationCode}, Body) -> + #post_data{id = Id, location_code = LocationCode, body = Body}. \ No newline at end of file diff --git a/apps/iot/src/mnesia/mnesia_queue.erl b/apps/iot/src/mnesia/mnesia_queue.erl index a55947d..0d15267 100644 --- a/apps/iot/src/mnesia/mnesia_queue.erl +++ b/apps/iot/src/mnesia/mnesia_queue.erl @@ -53,7 +53,8 @@ ensure_queue(Name) when is_atom(Name) -> table_size(Tab) when is_atom(Tab) -> mnesia:table_info(Tab, size). --spec dirty_fetch_next(Tab :: atom(), Cursor :: integer()) -> {ok, NCursor :: integer(), Item :: any()} | '$end_of_table'. +-spec dirty_fetch_next(Tab :: atom(), Cursor :: integer()) -> + {ok, NCursor :: integer(), Item :: any()} | '$end_of_table'. dirty_fetch_next(Tab, Cursor) when is_atom(Tab), is_integer(Cursor) -> case mnesia:dirty_next(Tab, Cursor) of '$end_of_table' ->