From 29d99f1da4d5a996ee4326e7a82a88857cd177c9 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 4 Jun 2025 16:56:00 +0800 Subject: [PATCH] fix mqtt --- apps/endpoint/include/endpoint.hrl | 2 + apps/endpoint/src/endpoint_mqtt.erl | 60 +++++++++++++++++++++++++---- 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/apps/endpoint/include/endpoint.hrl b/apps/endpoint/include/endpoint.hrl index bd70d64..75af57c 100644 --- a/apps/endpoint/include/endpoint.hrl +++ b/apps/endpoint/include/endpoint.hrl @@ -14,6 +14,8 @@ }). -record(mqtt_endpoint, { + %% 协议的版本号 + version :: integer(), host = <<>> :: binary(), port = 0 :: integer(), client_id = <<>> :: binary(), diff --git a/apps/endpoint/src/endpoint_mqtt.erl b/apps/endpoint/src/endpoint_mqtt.erl index cb50c17..9ccf463 100644 --- a/apps/endpoint/src/endpoint_mqtt.erl +++ b/apps/endpoint/src/endpoint_mqtt.erl @@ -92,8 +92,15 @@ handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED, - endpoint = #endpoint{title = Title, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) -> + endpoint = #endpoint{title = Title, config = #mqtt_endpoint{version = Ver, host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) -> lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]), + + ProtoVer = case Ver of + 3 -> v3; + 4 -> v4; + 5 -> v5; + _ -> v5 + end, Opts = [ {owner, self()}, {clientid, ClientId}, @@ -105,7 +112,7 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status {keepalive, 86400}, {auto_ack, true}, {connect_timeout, 5000}, - {proto_ver, v5}, + {proto_ver, ProtoVer}, {retry_interval, 5000} ], @@ -126,10 +133,34 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status handle_info({next_data, _Id, _LocationCode, _Message}, State = #state{status = ?DISCONNECTED}) -> {keep_state, State}; %% 发送数据到mqtt服务器 -handle_info({next_data, Id, LocationCode, Message}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) -> - Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), - lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]), - case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of +handle_info({next_data, Id, Metadata, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, + endpoint = #endpoint{config = #mqtt_endpoint{version = 5, topic = Topic, qos = Qos}}}) -> + + lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Metric, Qos]), + Props = #{ + 'User-Property' => format_props(maps:to_list(Metadata)) + }, + case emqtt:publish(ConnPid, Topic, Props, Metric, [{qos, Qos}, {retain, true}]) of + ok -> + NBuffer = endpoint_buffer:ack(Id, Buffer), + {noreply, State#state{buffer = NBuffer}}; + {ok, PacketId} -> + {noreply, State#state{inflight = maps:put(PacketId, Id, InFlight)}}; + {error, Reason} -> + lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]), + {stop, Reason, State} + end; + +%% 3.x版本附加到payload里面 +handle_info({next_data, Id, Metadata, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, + endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic, qos = Qos}}}) -> + + XMetadata = base64:encode(iolist_to_binary(jiffy:encode(Metadata, [force_utf8]))), + Len = byte_size(XMetadata), + Payload = <>, + + lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Payload, Qos]), + case emqtt:publish(ConnPid, Topic, #{}, Payload, [{qos, Qos}, {retain, true}]) of ok -> NBuffer = endpoint_buffer:ack(Id, Buffer), {noreply, State#state{buffer = NBuffer}}; @@ -191,4 +222,19 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions -%%%=================================================================== \ No newline at end of file +%%%=================================================================== + +-spec format_props(Props :: [{Key :: any(), Val :: any()}]) -> [{Key1 :: binary(), Val1 :: binary()}]. +format_props(Props) when is_list(Props) -> + format_props(Props, []). +format_props([], Acc) -> + Acc; +format_props([{Key, Val}|T], Acc) when is_binary(Key), is_integer(Val) -> + format_props(T, [{Key, integer_to_binary(Val)}|Acc]); +format_props([{Key, Val}|T], Acc) when is_binary(Key), is_float(Val) -> + NVal = list_to_binary(float_to_list(Val, [{decimals, 2}, compact])), + format_props(T, [{Key, NVal}|Acc]); +format_props([{Key, Val}|T], Acc) when is_binary(Key), is_binary(Val) -> + format_props(T, [{Key, Val}|Acc]); +format_props([_|T], Acc) -> + format_props(T, Acc).