From a2e320a6a990fe6bf112787d9308169532652e15 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Thu, 17 Aug 2023 22:58:42 +0800 Subject: [PATCH] fix host --- apps/iot/src/iot_endpoint.erl | 61 +++++++++++++++++------------------ 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index f27c729..d18ddf7 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -155,26 +155,15 @@ handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint {keep_state, State}; handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize -> {keep_state, State}; -handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) -> +handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name}, timer_map = TimerMap, flight_num = FlightNum}) -> case mnesia_queue:dirty_fetch_next(TabName, Cursor) of {ok, NCursor, NorthData = #north_data{id = Id}} -> lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), - case safe_invoke_mapper(MapperFun, NorthData) of - {ok, Body} -> - PostmanPid ! {post, self(), make_post_data(NorthData, Body)}, - %% 重发机制, 在发送的过程中mapper可能会改变 - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - - {keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}}; - - {error, Error} -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]), - mnesia_queue:delete(TabName, Id), + case do_post(NorthData, State) of + error -> {keep_state, State}; - ignore -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]), - mnesia_queue:delete(TabName, Id), - {keep_state, State} + {ok, TimerRef} -> + {keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}} end; '$end_of_table' -> {keep_state, State} @@ -191,24 +180,13 @@ handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endp {keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions}; %% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap}) -> +handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap}) -> lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]), - case safe_invoke_mapper(MapperFun, NorthData) of - {ok, Body} -> - PostmanPid ! {post, self(), make_post_data(NorthData, Body)}, - %% 重发机制, 在发送的过程中mapper可能会改变 - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - - {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}; - - {error, Error} -> - lager:debug("[iot_endpoint] endpoint: ~p, repost mapper get error: ~p", [Name, Error]), - mnesia_queue:delete(TabName, Id), + case do_post(NorthData, State) of + error -> {keep_state, State}; - ignore -> - lager:debug("[iot_endpoint] endpoint: ~p, repost mapper ignore", [Name]), - mnesia_queue:delete(TabName, Id), - {keep_state, State} + {ok, TimerRef} -> + {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}} end; %% 离线时,忽略超时逻辑 @@ -333,6 +311,25 @@ create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, user create_postman(#endpoint{}) -> throw(<<"not supported">>). +-spec do_post(NorthData :: #north_data{}, State :: #state{}) -> error | {ok, TimerRef :: reference()}. +do_post(NorthData = #north_data{id = Id}, #state{postman_pid = PostmanPid, tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> + lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), + case safe_invoke_mapper(MapperFun, NorthData) of + {ok, Body} -> + PostmanPid ! {post, self(), make_post_data(NorthData, Body)}, + %% 重发机制, 在发送的过程中mapper可能会改变 + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), + {ok, TimerRef}; + {error, Error} -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]), + mnesia_queue:delete(TabName, Id), + error; + ignore -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]), + mnesia_queue:delete(TabName, Id), + error + end. + -spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) -> {ok, Body :: any()} | ignore | {error, Reason :: any()}. safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) ->