This commit is contained in:
anlicheng 2023-08-08 10:27:00 +08:00
parent 1122d679a2
commit 8a898192a9
2 changed files with 20 additions and 10 deletions

View File

@ -142,19 +142,28 @@ handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, Stat
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) ->
try
Body = if
Result = if
is_function(MapperFun, 2) ->
MapperFun(LocationCode, Fields);
is_function(MapperFun, 3) ->
MapperFun(LocationCode, Fields, Timestamp)
end,
mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}),
%%
Actions = case StateName =:= connected andalso FlightNum < WindowSize of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State, Actions}
case Result of
{ok, Body} ->
mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}),
%%
Actions = case StateName =:= connected andalso FlightNum < WindowSize of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State, Actions};
{error, Error} ->
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p", [Name, Error]),
{keep_state, State};
ignore ->
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore", [Name]),
{keep_state, State}
end
catch _:Reason ->
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper data get error: ~p", [Name, Reason]),
{keep_state, State}
@ -259,7 +268,8 @@ handle_event(EventType, Event, StateName, State) ->
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(_Reason, _StateName, _State) ->
terminate(Reason, _StateName, _State) ->
lager:debug("[iot_endpoint] terminate with reason: ~p", [Reason]),
ok.
%% @private

View File

@ -100,7 +100,7 @@ handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_p
end;
%%
handle_info({post, ReceiverPid, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
handle_info({post, ReceiverPid, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]),
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of