This commit is contained in:
anlicheng 2025-06-04 16:56:00 +08:00
parent 21c254b8b1
commit 29d99f1da4
2 changed files with 55 additions and 7 deletions

View File

@ -14,6 +14,8 @@
}).
-record(mqtt_endpoint, {
%%
version :: integer(),
host = <<>> :: binary(),
port = 0 :: integer(),
client_id = <<>> :: binary(),

View File

@ -92,8 +92,15 @@ handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer =
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED,
endpoint = #endpoint{title = Title, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) ->
endpoint = #endpoint{title = Title, config = #mqtt_endpoint{version = Ver, host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) ->
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]),
ProtoVer = case Ver of
3 -> v3;
4 -> v4;
5 -> v5;
_ -> v5
end,
Opts = [
{owner, self()},
{clientid, ClientId},
@ -105,7 +112,7 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
{keepalive, 86400},
{auto_ack, true},
{connect_timeout, 5000},
{proto_ver, v5},
{proto_ver, ProtoVer},
{retry_interval, 5000}
],
@ -126,10 +133,34 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
handle_info({next_data, _Id, _LocationCode, _Message}, State = #state{status = ?DISCONNECTED}) ->
{keep_state, State};
%% mqtt服务器
handle_info({next_data, Id, LocationCode, Message}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]),
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
handle_info({next_data, Id, Metadata, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight,
endpoint = #endpoint{config = #mqtt_endpoint{version = 5, topic = Topic, qos = Qos}}}) ->
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Metric, Qos]),
Props = #{
'User-Property' => format_props(maps:to_list(Metadata))
},
case emqtt:publish(ConnPid, Topic, Props, Metric, [{qos, Qos}, {retain, true}]) of
ok ->
NBuffer = endpoint_buffer:ack(Id, Buffer),
{noreply, State#state{buffer = NBuffer}};
{ok, PacketId} ->
{noreply, State#state{inflight = maps:put(PacketId, Id, InFlight)}};
{error, Reason} ->
lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]),
{stop, Reason, State}
end;
%% 3.x版本附加到payload里面
handle_info({next_data, Id, Metadata, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight,
endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic, qos = Qos}}}) ->
XMetadata = base64:encode(iolist_to_binary(jiffy:encode(Metadata, [force_utf8]))),
Len = byte_size(XMetadata),
Payload = <<Len:16, XMetadata/binary, Metric/binary>>,
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Payload, Qos]),
case emqtt:publish(ConnPid, Topic, #{}, Payload, [{qos, Qos}, {retain, true}]) of
ok ->
NBuffer = endpoint_buffer:ack(Id, Buffer),
{noreply, State#state{buffer = NBuffer}};
@ -191,4 +222,19 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
%%%===================================================================
-spec format_props(Props :: [{Key :: any(), Val :: any()}]) -> [{Key1 :: binary(), Val1 :: binary()}].
format_props(Props) when is_list(Props) ->
format_props(Props, []).
format_props([], Acc) ->
Acc;
format_props([{Key, Val}|T], Acc) when is_binary(Key), is_integer(Val) ->
format_props(T, [{Key, integer_to_binary(Val)}|Acc]);
format_props([{Key, Val}|T], Acc) when is_binary(Key), is_float(Val) ->
NVal = list_to_binary(float_to_list(Val, [{decimals, 2}, compact])),
format_props(T, [{Key, NVal}|Acc]);
format_props([{Key, Val}|T], Acc) when is_binary(Key), is_binary(Val) ->
format_props(T, [{Key, Val}|Acc]);
format_props([_|T], Acc) ->
format_props(T, Acc).