From a44c898c4fe88e0077f58af03c1b5a11fc775a12 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Wed, 9 Aug 2023 11:56:37 +0800 Subject: [PATCH] fix mapper --- apps/iot/include/iot.hrl | 9 +++ apps/iot/src/iot_endpoint.erl | 97 +++++++++++++++----------- apps/iot/src/postman/http_postman.erl | 2 +- apps/iot/src/postman/mqtt_postman.erl | 2 +- apps/iot/src/postman/mysql_postman.erl | 2 +- 5 files changed, 69 insertions(+), 43 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 171e48c..fda3d26 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -107,6 +107,15 @@ %% 北向数据 -record(north_data, { + id = 0 :: integer(), + location_code :: binary(), + %% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}] + fields :: [{K :: binary(), V :: any()}], + timestamp = 0 :: integer() +}). + +%% 发送数据 +-record(post_data, { id = 0 :: integer(), location_code :: binary(), %% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}] diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index 9af122d..982e3e5 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -140,34 +140,14 @@ handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, Stat {next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}} end; -handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> - try - Result = if - is_function(MapperFun, 2) -> - MapperFun(LocationCode, Fields); - is_function(MapperFun, 3) -> - MapperFun(LocationCode, Fields, Timestamp) - end, - case Result of - {ok, Body} -> - mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}), - %% 避免不必要的内部消息 - Actions = case StateName =:= connected andalso FlightNum < WindowSize of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State, Actions}; - {error, Error} -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p", [Name, Error]), - {keep_state, State}; - ignore -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore", [Name]), - {keep_state, State} - end - catch _:Reason -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper data get error: ~p", [Name, Reason]), - {keep_state, State} - end; +handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum}) -> + mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}), + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso FlightNum < WindowSize of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State, Actions}; %% 触发读取下一条数据 handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint{name = Name}}) -> @@ -175,16 +155,29 @@ handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint {keep_state, State}; handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize -> {keep_state, State}; -handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) -> +handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) -> case catch mnesia_queue:dirty_fetch_next(TabName, Cursor) of - {ok, NCursor, NorthData = #north_data{id = Id}} -> + {ok, NCursor, NorthData = #north_data{id = Id, location_code = LocationCode}} -> lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), + case safe_invoke_mapper(MapperFun, NorthData) of + {ok, Body} -> + PostData = #post_data{id = Id, body = Body, location_code = LocationCode}, - PostmanPid ! {post, self(), NorthData}, - %% 重发机制 - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), + PostmanPid ! {post, self(), PostData}, + %% 重发机制, 在发送的过程中mapper可能会改变 + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - {keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}}; + {keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}}; + + {error, Error} -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p", [Name, Error]), + mnesia_queue:delete(TabName, Id), + {keep_state, State}; + ignore -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore", [Name]), + mnesia_queue:delete(TabName, Id), + {keep_state, State} + end; '$end_of_table' -> lager:debug("[iot_endpoint] endpoint:~p, fetch_next failed $end_of_table", [Name]), {keep_state, State}; @@ -204,13 +197,25 @@ handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endp {keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions}; %% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap}) -> +handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id, location_code = LocationCode}}}, connected, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}, postman_pid = PostmanPid, timer_map = TimerMap}) -> lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]), - PostmanPid ! {post, self(), NorthData}, - %% 5秒后重发 - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), + case safe_invoke_mapper(MapperFun, NorthData) of + {ok, Body} -> + PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}, + %% 重发机制, 在发送的过程中mapper可能会改变 + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}; + {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}; + + {error, Error} -> + lager:debug("[iot_endpoint] endpoint: ~p, repost mapper get error: ~p", [Name, Error]), + mnesia_queue:delete(TabName, Id), + {keep_state, State}; + ignore -> + lager:debug("[iot_endpoint] endpoint: ~p, repost mapper ignore", [Name]), + mnesia_queue:delete(TabName, Id), + {keep_state, State} + end; %% 离线时,忽略超时逻辑 handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> @@ -332,4 +337,16 @@ create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, user ], broker_postman:start_link(self(), mysql_postman, WorkerArgs, PoolSize); create_postman(#endpoint{}) -> - throw(<<"not supported">>). \ No newline at end of file + throw(<<"not supported">>). + +safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) -> + try + if + is_function(MapperFun, 2) -> + MapperFun(LocationCode, Fields); + is_function(MapperFun, 3) -> + MapperFun(LocationCode, Fields, Timestamp) + end + catch _:Error -> + {error, Error} + end. \ No newline at end of file diff --git a/apps/iot/src/postman/http_postman.erl b/apps/iot/src/postman/http_postman.erl index d3d9fef..22accb7 100644 --- a/apps/iot/src/postman/http_postman.erl +++ b/apps/iot/src/postman/http_postman.erl @@ -73,7 +73,7 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({post, ReceiverPid, #north_data{id = Id, body = Body}}, State = #state{url = Url}) -> +handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #state{url = Url}) -> Headers = [ {<<"content-type">>, <<"application/json">>} ], diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index b9a868c..b9ee752 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -100,7 +100,7 @@ handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_p end; %% 转发信息 -handle_info({post, ReceiverPid, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> +handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of diff --git a/apps/iot/src/postman/mysql_postman.erl b/apps/iot/src/postman/mysql_postman.erl index 031a564..9217397 100644 --- a/apps/iot/src/postman/mysql_postman.erl +++ b/apps/iot/src/postman/mysql_postman.erl @@ -80,7 +80,7 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({post, ReceiverPid, #north_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) -> +handle_info({post, ReceiverPid, #post_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) -> case catch mysql_provider:insert(ConnPid, Table, Fields, false) of ok -> ReceiverPid ! {ack, Id};