diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index 66e54d2..9af122d 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -142,19 +142,28 @@ handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, Stat handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> try - Body = if + Result = if is_function(MapperFun, 2) -> MapperFun(LocationCode, Fields); is_function(MapperFun, 3) -> MapperFun(LocationCode, Fields, Timestamp) end, - mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}), - %% 避免不必要的内部消息 - Actions = case StateName =:= connected andalso FlightNum < WindowSize of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State, Actions} + case Result of + {ok, Body} -> + mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}), + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso FlightNum < WindowSize of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State, Actions}; + {error, Error} -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p", [Name, Error]), + {keep_state, State}; + ignore -> + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore", [Name]), + {keep_state, State} + end catch _:Reason -> lager:debug("[iot_endpoint] forward endpoint: ~p, mapper data get error: ~p", [Name, Reason]), {keep_state, State} @@ -259,7 +268,8 @@ handle_event(EventType, Event, StateName, State) -> %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_statem terminates with %% Reason. The return value is ignored. -terminate(_Reason, _StateName, _State) -> +terminate(Reason, _StateName, _State) -> + lager:debug("[iot_endpoint] terminate with reason: ~p", [Reason]), ok. %% @private diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index 4826341..b9a868c 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -100,7 +100,7 @@ handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_p end; %% 转发信息 -handle_info({post, ReceiverPid, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> +handle_info({post, ReceiverPid, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of