fix endpoint
This commit is contained in:
parent
ef6a81ce44
commit
0cc1e3e542
@ -100,7 +100,7 @@ init([Endpoint = #endpoint{name = Name}]) ->
|
||||
|
||||
{ok, disconnected, #state{endpoint = Endpoint, tab_name = TabName, postman_pid = undefined}}
|
||||
catch _:Error ->
|
||||
lager:warning("[iot_endpoint] endpoint: ~p, init get error: ~p", [Name, Error]),
|
||||
lager:warning("[iot_endpoint] endpoint: ~p, init get error: ~p, ignore", [Name, Error]),
|
||||
ignore
|
||||
end.
|
||||
|
||||
@ -156,33 +156,27 @@ handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint
|
||||
handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize ->
|
||||
{keep_state, State};
|
||||
handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) ->
|
||||
case catch mnesia_queue:dirty_fetch_next(TabName, Cursor) of
|
||||
{ok, NCursor, NorthData = #north_data{id = Id, location_code = LocationCode}} ->
|
||||
case mnesia_queue:dirty_fetch_next(TabName, Cursor) of
|
||||
{ok, NCursor, NorthData = #north_data{id = Id}} ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
|
||||
case safe_invoke_mapper(MapperFun, NorthData) of
|
||||
{ok, Body} ->
|
||||
PostData = #post_data{id = Id, body = Body, location_code = LocationCode},
|
||||
|
||||
PostmanPid ! {post, self(), PostData},
|
||||
PostmanPid ! {post, self(), make_post_data(NorthData, Body)},
|
||||
%% 重发机制, 在发送的过程中mapper可能会改变
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
|
||||
{keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}};
|
||||
|
||||
{error, Error} ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p", [Name, Error]),
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
{keep_state, State};
|
||||
ignore ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore", [Name]),
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
{keep_state, State}
|
||||
end;
|
||||
'$end_of_table' ->
|
||||
lager:debug("[iot_endpoint] endpoint:~p, fetch_next failed $end_of_table", [Name]),
|
||||
{keep_state, State};
|
||||
Error ->
|
||||
lager:debug("[iot_endpoint] endpoint:~p, fetch_next get error: ~p", [Name, Error]),
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
@ -197,11 +191,11 @@ handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endp
|
||||
{keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions};
|
||||
|
||||
%% 收到重发过期请求
|
||||
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id, location_code = LocationCode}}}, connected, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap}) ->
|
||||
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap}) ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]),
|
||||
case safe_invoke_mapper(MapperFun, NorthData) of
|
||||
{ok, Body} ->
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
|
||||
PostmanPid ! {post, self(), make_post_data(NorthData, Body)},
|
||||
%% 重发机制, 在发送的过程中mapper可能会改变
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
|
||||
@ -252,7 +246,7 @@ handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum,
|
||||
},
|
||||
{keep_state, State, [{reply, From, Stat}]};
|
||||
|
||||
%% 所有未确认的消息进入队列里面, 这里不保证消息的顺序
|
||||
%% postman进程挂掉时,重新建立新的
|
||||
handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap, postman_pid = PostmanPid}) ->
|
||||
lager:warning("[iot_endpoint] endpoint: ~p, postman exited with reason: ~p", [Name, Reason]),
|
||||
lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)),
|
||||
@ -273,8 +267,8 @@ handle_event(EventType, Event, StateName, State) ->
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||
%% Reason. The return value is ignored.
|
||||
terminate(Reason, _StateName, _State) ->
|
||||
lager:debug("[iot_endpoint] terminate with reason: ~p", [Reason]),
|
||||
terminate(Reason, _StateName, #state{endpoint = #endpoint{name = Name}}) ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Name, Reason]),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
@ -291,9 +285,9 @@ remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) ->
|
||||
case maps:take(Id, TimerMap) of
|
||||
error ->
|
||||
TimerMap;
|
||||
{TimerRef, TimerMap0} ->
|
||||
catch erlang:cancel_timer(TimerRef),
|
||||
TimerMap0
|
||||
{TimerRef, NTimerMap} ->
|
||||
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
|
||||
NTimerMap
|
||||
end.
|
||||
|
||||
%% 对http和https协议的支持
|
||||
@ -310,8 +304,8 @@ create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port
|
||||
{host, binary_to_list(Host)},
|
||||
{port, Port},
|
||||
{tcp_opts, []},
|
||||
{username, unicode:characters_to_list(Username)},
|
||||
{password, unicode:characters_to_list(Password)},
|
||||
{username, binary_to_list(Username)},
|
||||
{password, binary_to_list(Password)},
|
||||
{keepalive, 86400},
|
||||
{auto_ack, true},
|
||||
{connect_timeout, 5000},
|
||||
@ -339,6 +333,8 @@ create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, user
|
||||
create_postman(#endpoint{}) ->
|
||||
throw(<<"not supported">>).
|
||||
|
||||
-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) ->
|
||||
{ok, Body :: any()} | ignore | {error, Reason :: any()}.
|
||||
safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) ->
|
||||
try
|
||||
if
|
||||
@ -349,4 +345,8 @@ safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields =
|
||||
end
|
||||
catch _:Error ->
|
||||
{error, Error}
|
||||
end.
|
||||
end.
|
||||
|
||||
-spec make_post_data(NorthData :: #north_data{}, Body :: any()) -> PostData :: #post_data{}.
|
||||
make_post_data(#north_data{id = Id, location_code = LocationCode}, Body) ->
|
||||
#post_data{id = Id, location_code = LocationCode, body = Body}.
|
||||
@ -53,7 +53,8 @@ ensure_queue(Name) when is_atom(Name) ->
|
||||
table_size(Tab) when is_atom(Tab) ->
|
||||
mnesia:table_info(Tab, size).
|
||||
|
||||
-spec dirty_fetch_next(Tab :: atom(), Cursor :: integer()) -> {ok, NCursor :: integer(), Item :: any()} | '$end_of_table'.
|
||||
-spec dirty_fetch_next(Tab :: atom(), Cursor :: integer()) ->
|
||||
{ok, NCursor :: integer(), Item :: any()} | '$end_of_table'.
|
||||
dirty_fetch_next(Tab, Cursor) when is_atom(Tab), is_integer(Cursor) ->
|
||||
case mnesia:dirty_next(Tab, Cursor) of
|
||||
'$end_of_table' ->
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user