fix host
This commit is contained in:
parent
83a8882b4e
commit
a2e320a6a9
@ -155,26 +155,15 @@ handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint
|
||||
{keep_state, State};
|
||||
handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize ->
|
||||
{keep_state, State};
|
||||
handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) ->
|
||||
handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name}, timer_map = TimerMap, flight_num = FlightNum}) ->
|
||||
case mnesia_queue:dirty_fetch_next(TabName, Cursor) of
|
||||
{ok, NCursor, NorthData = #north_data{id = Id}} ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
|
||||
case safe_invoke_mapper(MapperFun, NorthData) of
|
||||
{ok, Body} ->
|
||||
PostmanPid ! {post, self(), make_post_data(NorthData, Body)},
|
||||
%% 重发机制, 在发送的过程中mapper可能会改变
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
|
||||
{keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}};
|
||||
|
||||
{error, Error} ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
case do_post(NorthData, State) of
|
||||
error ->
|
||||
{keep_state, State};
|
||||
ignore ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
{keep_state, State}
|
||||
{ok, TimerRef} ->
|
||||
{keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}}
|
||||
end;
|
||||
'$end_of_table' ->
|
||||
{keep_state, State}
|
||||
@ -191,24 +180,13 @@ handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endp
|
||||
{keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions};
|
||||
|
||||
%% 收到重发过期请求
|
||||
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap}) ->
|
||||
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap}) ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]),
|
||||
case safe_invoke_mapper(MapperFun, NorthData) of
|
||||
{ok, Body} ->
|
||||
PostmanPid ! {post, self(), make_post_data(NorthData, Body)},
|
||||
%% 重发机制, 在发送的过程中mapper可能会改变
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
|
||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
|
||||
|
||||
{error, Error} ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, repost mapper get error: ~p", [Name, Error]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
case do_post(NorthData, State) of
|
||||
error ->
|
||||
{keep_state, State};
|
||||
ignore ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, repost mapper ignore", [Name]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
{keep_state, State}
|
||||
{ok, TimerRef} ->
|
||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}
|
||||
end;
|
||||
|
||||
%% 离线时,忽略超时逻辑
|
||||
@ -333,6 +311,25 @@ create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, user
|
||||
create_postman(#endpoint{}) ->
|
||||
throw(<<"not supported">>).
|
||||
|
||||
-spec do_post(NorthData :: #north_data{}, State :: #state{}) -> error | {ok, TimerRef :: reference()}.
|
||||
do_post(NorthData = #north_data{id = Id}, #state{postman_pid = PostmanPid, tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
|
||||
case safe_invoke_mapper(MapperFun, NorthData) of
|
||||
{ok, Body} ->
|
||||
PostmanPid ! {post, self(), make_post_data(NorthData, Body)},
|
||||
%% 重发机制, 在发送的过程中mapper可能会改变
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
{ok, TimerRef};
|
||||
{error, Error} ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
error;
|
||||
ignore ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
error
|
||||
end.
|
||||
|
||||
-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) ->
|
||||
{ok, Body :: any()} | ignore | {error, Reason :: any()}.
|
||||
safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) ->
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user