From a2ab8f6591c4fc1a0bfd1ae91bdc877caf158374 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Mon, 24 Jul 2023 14:48:15 +0800 Subject: [PATCH] fix emqtt --- apps/iot/include/emqtt.hrl | 6 ++++++ apps/iot/src/emqtt/emqtt.erl | 4 ++++ apps/iot/src/emqtt/emqtt_frame.erl | 9 +++++---- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/apps/iot/include/emqtt.hrl b/apps/iot/include/emqtt.hrl index 1fd4a76..8caa08f 100644 --- a/apps/iot/include/emqtt.hrl +++ b/apps/iot/include/emqtt.hrl @@ -399,6 +399,12 @@ reason_code = 0} }). +-define(PUBREC_PACKET(PacketId, ReasonCode), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode} + }). + -define(PUBREC_PACKET(PacketId, ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, variable = #mqtt_packet_puback{packet_id = PacketId, diff --git a/apps/iot/src/emqtt/emqtt.erl b/apps/iot/src/emqtt/emqtt.erl index 8ef6df9..999d27a 100644 --- a/apps/iot/src/emqtt/emqtt.erl +++ b/apps/iot/src/emqtt/emqtt.erl @@ -790,6 +790,10 @@ connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) - end, send_puback(?PUBREL_PACKET(PacketId), NState); +connected(cast, ?PUBREC_PACKET(PacketId, ReasonCode), State) -> + lager:notice("[emqtt] Duplicated PUBREC Packet: ~p, reason_code: ~p, client_id: ~p", [PacketId, ReasonCode, State#state.clientid]), + keep_state_and_data; + %%TODO::... if auto_ack is false, should we take PacketId from the map? connected(cast, ?PUBREL_PACKET(PacketId), State = #state{awaiting_rel = AwaitingRel, auto_ack = AutoAck}) -> diff --git a/apps/iot/src/emqtt/emqtt_frame.erl b/apps/iot/src/emqtt/emqtt_frame.erl index 2b78895..96393f3 100644 --- a/apps/iot/src/emqtt/emqtt_frame.erl +++ b/apps/iot/src/emqtt/emqtt_frame.erl @@ -98,8 +98,7 @@ parse_remaining_len(<<>>, Header, Options) -> parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(Rest, Header, 1, 0, Options). -parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) - when Length > MaxSize -> +parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> error(frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; @@ -119,8 +118,10 @@ parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options = #{max_size := MaxSize}) -> FrameLen = Value + Len * Multiplier, if - FrameLen > MaxSize -> error(frame_too_large); - true -> parse_frame(Rest, Header, FrameLen, Options) + FrameLen > MaxSize -> + error(frame_too_large); + true -> + parse_frame(Rest, Header, FrameLen, Options) end. parse_frame(Bin, Header, 0, Options) ->