fix mapper
This commit is contained in:
parent
c899626954
commit
a44c898c4f
@ -107,6 +107,15 @@
|
||||
|
||||
%% 北向数据
|
||||
-record(north_data, {
|
||||
id = 0 :: integer(),
|
||||
location_code :: binary(),
|
||||
%% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}]
|
||||
fields :: [{K :: binary(), V :: any()}],
|
||||
timestamp = 0 :: integer()
|
||||
}).
|
||||
|
||||
%% 发送数据
|
||||
-record(post_data, {
|
||||
id = 0 :: integer(),
|
||||
location_code :: binary(),
|
||||
%% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}]
|
||||
|
||||
@ -140,34 +140,14 @@ handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, Stat
|
||||
{next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}}
|
||||
end;
|
||||
|
||||
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) ->
|
||||
try
|
||||
Result = if
|
||||
is_function(MapperFun, 2) ->
|
||||
MapperFun(LocationCode, Fields);
|
||||
is_function(MapperFun, 3) ->
|
||||
MapperFun(LocationCode, Fields, Timestamp)
|
||||
end,
|
||||
case Result of
|
||||
{ok, Body} ->
|
||||
mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}),
|
||||
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum}) ->
|
||||
mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}),
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case StateName =:= connected andalso FlightNum < WindowSize of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
false -> []
|
||||
end,
|
||||
{keep_state, State, Actions};
|
||||
{error, Error} ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p", [Name, Error]),
|
||||
{keep_state, State};
|
||||
ignore ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore", [Name]),
|
||||
{keep_state, State}
|
||||
end
|
||||
catch _:Reason ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper data get error: ~p", [Name, Reason]),
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
%% 触发读取下一条数据
|
||||
handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint{name = Name}}) ->
|
||||
@ -175,16 +155,29 @@ handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint
|
||||
{keep_state, State};
|
||||
handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize ->
|
||||
{keep_state, State};
|
||||
handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) ->
|
||||
handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) ->
|
||||
case catch mnesia_queue:dirty_fetch_next(TabName, Cursor) of
|
||||
{ok, NCursor, NorthData = #north_data{id = Id}} ->
|
||||
{ok, NCursor, NorthData = #north_data{id = Id, location_code = LocationCode}} ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
|
||||
case safe_invoke_mapper(MapperFun, NorthData) of
|
||||
{ok, Body} ->
|
||||
PostData = #post_data{id = Id, body = Body, location_code = LocationCode},
|
||||
|
||||
PostmanPid ! {post, self(), NorthData},
|
||||
%% 重发机制
|
||||
PostmanPid ! {post, self(), PostData},
|
||||
%% 重发机制, 在发送的过程中mapper可能会改变
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
|
||||
{keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}};
|
||||
|
||||
{error, Error} ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p", [Name, Error]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
{keep_state, State};
|
||||
ignore ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore", [Name]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
{keep_state, State}
|
||||
end;
|
||||
'$end_of_table' ->
|
||||
lager:debug("[iot_endpoint] endpoint:~p, fetch_next failed $end_of_table", [Name]),
|
||||
{keep_state, State};
|
||||
@ -204,14 +197,26 @@ handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endp
|
||||
{keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions};
|
||||
|
||||
%% 收到重发过期请求
|
||||
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap}) ->
|
||||
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id, location_code = LocationCode}}}, connected, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap}) ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]),
|
||||
PostmanPid ! {post, self(), NorthData},
|
||||
%% 5秒后重发
|
||||
case safe_invoke_mapper(MapperFun, NorthData) of
|
||||
{ok, Body} ->
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
|
||||
%% 重发机制, 在发送的过程中mapper可能会改变
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
|
||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
|
||||
|
||||
{error, Error} ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, repost mapper get error: ~p", [Name, Error]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
{keep_state, State};
|
||||
ignore ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, repost mapper ignore", [Name]),
|
||||
mnesia_queue:delete(TabName, Id),
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
%% 离线时,忽略超时逻辑
|
||||
handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) ->
|
||||
{keep_state, State};
|
||||
@ -333,3 +338,15 @@ create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, user
|
||||
broker_postman:start_link(self(), mysql_postman, WorkerArgs, PoolSize);
|
||||
create_postman(#endpoint{}) ->
|
||||
throw(<<"not supported">>).
|
||||
|
||||
safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) ->
|
||||
try
|
||||
if
|
||||
is_function(MapperFun, 2) ->
|
||||
MapperFun(LocationCode, Fields);
|
||||
is_function(MapperFun, 3) ->
|
||||
MapperFun(LocationCode, Fields, Timestamp)
|
||||
end
|
||||
catch _:Error ->
|
||||
{error, Error}
|
||||
end.
|
||||
@ -73,7 +73,7 @@ handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({post, ReceiverPid, #north_data{id = Id, body = Body}}, State = #state{url = Url}) ->
|
||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #state{url = Url}) ->
|
||||
Headers = [
|
||||
{<<"content-type">>, <<"application/json">>}
|
||||
],
|
||||
|
||||
@ -100,7 +100,7 @@ handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_p
|
||||
end;
|
||||
|
||||
%% 转发信息
|
||||
handle_info({post, ReceiverPid, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
|
||||
handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
|
||||
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
|
||||
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]),
|
||||
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
||||
|
||||
@ -80,7 +80,7 @@ handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({post, ReceiverPid, #north_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) ->
|
||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) ->
|
||||
case catch mysql_provider:insert(ConnPid, Table, Fields, false) of
|
||||
ok ->
|
||||
ReceiverPid ! {ack, Id};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user