diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 04a9300..dade9f3 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -95,7 +95,7 @@ %% 北向数据 -record(north_data, { - ref :: reference(), + id :: integer(), location_code :: binary(), body :: binary() }). \ No newline at end of file diff --git a/apps/iot/src/http_handler/endpoint_handler.erl b/apps/iot/src/http_handler/endpoint_handler.erl index 6efa354..ba77a9a 100644 --- a/apps/iot/src/http_handler/endpoint_handler.erl +++ b/apps/iot/src/http_handler/endpoint_handler.erl @@ -79,7 +79,13 @@ handle_request("POST", "/endpoint/update", _, Params = #{<<"name">> := Name}) wh handle_request("POST", "/endpoint/delete", _, #{<<"name">> := Name}) when is_binary(Name) -> case mnesia_endpoint:delete(Name) of ok -> - iot_endpoint_sup:delete_endpoint(Name), + case iot_endpoint:get_pid(Name) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; + Pid -> + iot_endpoint:clean_up(Pid), + iot_endpoint_sup:delete_endpoint(Name) + end, {ok, 200, iot_util:json_data(<<"success">>)}; {error, Reason} -> diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index 97e2ba1..b58494e 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -14,7 +14,7 @@ %% API -export([start_link/2]). --export([get_name/1, get_pid/1, forward/3, get_stat/1, reload/2]). +-export([get_name/1, get_pid/1, forward/3, get_stat/1, reload/2, clean_up/1]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -24,15 +24,19 @@ -record(state, { endpoint :: #endpoint{}, - postman :: undefined | {atom(), pid()}, - %% 发送后未确认的消息 - ack_map = #{}, + postman_pid :: undefined | pid(), + %% 队列数据库名, 写入到对端都有可能失败,因此缓存队列需要自己维护 + tab_name :: atom(), + %% 当前数据的游标, #north_data的id + cursor = 0 :: integer(), %% 定时器 timer_map = #{}, + %% 窗口大小,允许最大的未确认消息数 + window_size = 10, + %% 未确认的消息数 + flight_num = 0, %% 记录成功处理的消息数 - acc_num = 0, - %% 当postman进程异常时,需要建立缓存区 - q = queue:new() + acc_num = 0 }). %%%=================================================================== @@ -62,6 +66,10 @@ reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> get_stat(Pid) when is_pid(Pid) -> gen_statem:call(Pid, get_stat, 5000). +-spec clean_up(Pid :: pid()) -> ok. +clean_up(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, clean_up, 5000). + %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. @@ -76,12 +84,17 @@ start_link(Name, Endpoint = #endpoint{}) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([Endpoint]) -> +init([Endpoint = #endpoint{name = Name}]) -> erlang:process_flag(trap_exit, true), - %% 创建转发器 - erlang:start_timer(0, self(), recreate_postman), - {ok, disconnected, #state{endpoint = Endpoint, postman = undefined}}. + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_postman), + %% 创建自己的队列数据库表 + TabName = binary_to_atom(<<"queue_data:", Name/binary>>), + %% 如果表已经存在需要等待表数据加载完成 + mnesia_queue:ensure_queue(TabName), + + {ok, disconnected, #state{endpoint = Endpoint, tab_name = TabName, postman_pid = undefined}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out @@ -97,11 +110,11 @@ callback_mode() -> %% 重新加载新的终端配置 handle_event(cast, {reload, NEndpoint}, disconnected, State = #state{endpoint = Endpoint}) -> - lager:warning("[iot_endpoint] reload endpoint, old: ~p, new: ~p", [Endpoint, Endpoint, NEndpoint]), + lager:warning("[iot_endpoint] state_name: disconnected, reload endpoint, old: ~p, new: ~p", [Endpoint, Endpoint, NEndpoint]), {keep_state, State#state{endpoint = NEndpoint}}; -handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, State = #state{endpoint = Endpoint, postman = {_, PostmanPid}}) -> - lager:debug("[iot_endpoint] reload endpoint, old: ~p~n, new: ~p", [Endpoint, NEndpoint]), +handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, State = #state{endpoint = Endpoint, timer_map = TimerMap, postman_pid = PostmanPid}) -> + lager:debug("[iot_endpoint] state_name: connected, reload endpoint, old: ~p, new: ~p", [Endpoint, NEndpoint]), case mnesia_endpoint:config_equals(NEndpoint#endpoint.config, Endpoint#endpoint.config) of true -> lager:debug("[iot_endpoint] reload endpoint: ~p, config equals", [Name]), @@ -110,99 +123,112 @@ handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, Stat %% 解除和postman的link关系 unlink(PostmanPid), %% 关闭postman进程 - PostmanPid ! stop, + catch PostmanPid ! stop, %% 未确认的消息需要暂存 - NState = stash(State), + lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), %% 重新建立新的postman - erlang:start_timer(0, self(), recreate_postman), + erlang:start_timer(0, self(), create_postman), - {next_state, disconnected, NState#state{endpoint = NEndpoint, postman = undefined}} + {next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}} end; -handle_event(cast, {forward, LocationCode, Data}, _, State = #state{endpoint = Endpoint = #endpoint{mapper_fun = MapperFun}}) -> +handle_event(cast, {forward, LocationCode, Data}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> try Body = MapperFun(LocationCode, Data), - NorthData = #north_data{ref = make_ref(), location_code = LocationCode, body = Body}, - gen_statem:cast(self(), {post, NorthData}) + mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}), + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso FlightNum < WindowSize of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State, Actions} catch _:Reason -> - lager:debug("[iot_endpoint] endpoint: ~p, mapper data get error: ~p", [Endpoint, Reason]) - end, + lager:debug("[iot_endpoint] forward endpoint: ~p, mapper data get error: ~p", [Name, Reason]), + {keep_state, State} + end; + +%% 触发读取下一条数据 +handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint{name = Name}}) -> + lager:debug("[iot_endpoint] fetch_next endpoint: ~p, postman offline, data in queue", [Name]), {keep_state, State}; +handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize -> + {keep_state, State}; +handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap, flight_num = FlightNum}) -> + case catch mnesia_queue:dirty_fetch_next(TabName, Cursor) of + {ok, NCursor, NorthData = #north_data{id = Id}} -> + lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), -handle_event(cast, {post, NorthData = #north_data{ref = Ref}}, connected, State = #state{endpoint = Endpoint, postman = {_, PostmanPid}, ack_map = AckMap, timer_map = TimerMap}) -> - lager:debug("[iot_endpoint] endpoint: ~p, postman online, north data is: ~p", [Endpoint, NorthData]), + PostmanPid ! {post, NorthData}, + %% 重发机制 + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - PostmanPid ! {post, NorthData}, - %% 重发机制 - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - - {keep_state, State#state{ack_map = maps:put(Ref, NorthData, AckMap), timer_map = maps:put(Ref, TimerRef, TimerMap)}}; - -handle_event(cast, {post, NorthData}, disconnected, State = #state{endpoint = Endpoint, q = Q}) -> - lager:debug("[iot_endpoint] endpoint: ~p, postman offline, data in queue", [Endpoint]), - {keep_state, State#state{q = queue:in(NorthData, Q)}}; + {keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}}; + '$end_of_table' -> + lager:debug("[iot_endpoint] endpoint:~p, fetch_next failed $end_of_table", [Name]), + {keep_state, State}; + Error -> + lager:debug("[iot_endpoint] endpoint:~p, fetch_next get error: ~p", [Name, Error]), + {keep_state, State} + end; %% 收到确认消息 -handle_event(info, {ack, Ref}, _, State = #state{ack_map = AckMap, timer_map = TimerMap, acc_num = AccNum}) -> - NAckMap = maps:remove(Ref, AckMap), - NTimerMap = case maps:take(Ref, TimerMap) of - error -> - TimerMap; - {TimerRef, TimerMap0} -> - catch erlang:cancel_timer(TimerRef), - TimerMap0 - end, - {keep_state, State#state{ack_map = NAckMap, timer_map = NTimerMap, acc_num = AccNum + 1}}; +handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name}, timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) -> + lager:debug("[iot_endpoint] endpoint: ~p, get ack: ~p", [Name, Id]), + mnesia_queue:delete(TabName, Id), + Actions = case StateName =:= connected of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions}; %% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{ref = Ref}}}, connected, State = #state{postman = {_, PostmanPid}, timer_map = TimerMap}) -> +handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap}) -> + lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]), PostmanPid ! {post, NorthData}, %% 5秒后重发 TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - {keep_state, State#state{timer_map = maps:put(Ref, TimerRef, TimerMap)}}; + {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}; %% 离线时,忽略超时逻辑 handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> {keep_state, State}; -handle_event(info, {timeout, _, recreate_postman}, disconnected, State = #state{endpoint = Endpoint, ack_map = AckMap, timer_map = TimerMap, q = Q}) -> - lager:debug("[iot_endpoint] recreate postman: ~p", [Endpoint]), - try create_postman(Endpoint) of - {ok, Postman = {_, PostmanPid}} -> - lager:debug("[iot_endpoint] queue data is: ~p", [queue:to_list(Q)]), - %% 发送缓存区中的所有数据 - {NAckMap, NTimerMap} = lists:foldl(fun(NorthData = #north_data{ref = Ref}, {AckMap0, TimerMap0}) -> - PostmanPid ! {post, NorthData}, - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - {maps:put(Ref, NorthData, AckMap0), maps:put(Ref, TimerRef, TimerMap0)} - end, {AckMap, TimerMap}, queue:to_list(Q)), - %% 需要清空当前的队列 - {next_state, connected, State#state{endpoint = Endpoint, postman = Postman, ack_map = NAckMap, timer_map = NTimerMap, q = queue:new()}} - catch _:Reason -> - lager:warning("[iot_endpoint] recreate postman get error: ~p", [Reason]), - erlang:start_timer(?RETRY_INTERVAL, self(), recreate_postman), +handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{endpoint = Endpoint = #endpoint{name = Name}, window_size = WindowSize}) -> + lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]), + try + {ok, PostmanPid} = create_postman(Endpoint), + %% 最多允许window_size + Actions = lists:map(fun(_) -> {next_event, info, fetch_next} end, lists:seq(1, WindowSize)), + {next_state, connected, State#state{endpoint = Endpoint, postman_pid = PostmanPid, timer_map = maps:new(), flight_num = 0}, Actions} + catch _:Error -> + lager:warning("[iot_endpoint] endpoint: ~p, create postman get error: ~p", [Name, Error]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), - {keep_state, State#state{endpoint = Endpoint, postman = undefined}} + {keep_state, State#state{endpoint = Endpoint, postman_pid = undefined}} end; +%% 删除时需要清理 +handle_event({call, From}, clean_up, _, State = #state{tab_name = TabName}) -> + mnesia:delete_table(TabName), + {keep_state, State, [{reply, From, ok}]}; + %% 获取当前统计信息 -handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, ack_map = AckMap, q = Q}) -> +handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, tab_name = TabName}) -> Stat = #{ <<"acc_num">> => AccNum, - <<"unconfirmed_num">> => maps:size(AckMap), - <<"queue_num">> => queue:len(Q), - <<"state">> => atom_to_binary(StateName) + <<"queue_num">> => mnesia_queue:table_size(TabName), + <<"state_name">> => atom_to_binary(StateName) }, {keep_state, State, [{reply, From, Stat}]}; %% 所有未确认的消息进入队列里面, 这里不保证消息的顺序 -handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{ack_map = AckMap, postman = {_, PostmanPid}}) -> - lager:warning("[iot_endpoint] postman exited, current ack_map: ~p, reason: ~p", [AckMap, Reason]), - NState = stash(State), +handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap, postman_pid = PostmanPid}) -> + lager:warning("[iot_endpoint] endpoint: ~p, postman exited with reason: ~p", [Name, Reason]), + lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), - erlang:start_timer(?RETRY_INTERVAL, self(), recreate_postman), - {next_state, disconnected, NState#state{postman = undefined}}; + {next_state, disconnected, State#state{timer_map = maps:new(), postman_pid = undefined}}; %% @private %% @doc If callback_mode is handle_event_function, then whenever a @@ -229,20 +255,20 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -%% 将所有未处理的消息暂存 -stash(State = #state{ack_map = AckMap, timer_map = TimerMap, q = Q}) -> - Q1 = lists:foldl(fun({_, NorthData}, Q0) -> queue:in(NorthData, Q0) end, Q, maps:to_list(AckMap)), - %% 清空所有的timer - lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), - - State#state{q = Q1, ack_map = #{}, postman = undefined}. +-spec remove_timer(Id :: integer(), TimerMap :: #{}) -> NTimerMap :: #{}. +remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) -> + case maps:take(Id, TimerMap) of + error -> + TimerMap; + {TimerRef, TimerMap0} -> + catch erlang:cancel_timer(TimerRef), + TimerMap0 + end. %% 对http和https协议的支持 create_postman(#endpoint{name = Name, config = #http_endpoint{url = Url, pool_size = PoolSize}}) -> PoolName = binary_to_atom(<<"http_pool:", Name/binary>>), - {ok, PostmanPid} = http_postman:start_link(self(), Url, PoolName, PoolSize), - - {ok, {http, PostmanPid}}; + http_postman:start_link(self(), Url, PoolName, PoolSize); %% 对mqtt协议的支持 create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) -> @@ -262,8 +288,6 @@ create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port {retry_interval, 5} ], - {ok, PostmanPid} = mqtt_postman:start_link(self(), Opts, Topic, Qos), - - {ok, {mqtt, PostmanPid}}; + mqtt_postman:start_link(self(), Opts, Topic, Qos); create_postman(#endpoint{}) -> throw(<<"not supported">>). \ No newline at end of file diff --git a/apps/iot/src/mnesia/mnesia_queue.erl b/apps/iot/src/mnesia/mnesia_queue.erl new file mode 100644 index 0000000..a55947d --- /dev/null +++ b/apps/iot/src/mnesia/mnesia_queue.erl @@ -0,0 +1,64 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 26. 7月 2023 10:40 +%%%------------------------------------------------------------------- +-module(mnesia_queue). +-author("aresei"). +-include("iot.hrl"). + +%% API +-export([insert/2, delete/2, ensure_queue/1, table_size/1, dirty_fetch_next/2]). + +-spec insert(Tab :: atom(), #north_data{}) -> ok | {error, Reason :: any()}. +insert(Tab, Item = #north_data{}) -> + Id = mnesia_id_generator:next_id(Tab), + NItem = Item#north_data{id = Id}, + case mnesia:transaction(fun() -> mnesia:write(Tab, NItem, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec delete(Tab :: atom(), Key :: any()) -> ok | {error, Reason :: any()}. +delete(Tab, Key) when is_atom(Tab) -> + case mnesia:transaction(fun() -> mnesia:delete(Tab, Key, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +%% 确保数据已经建立 +-spec ensure_queue(Name :: atom()) -> no_return(). +ensure_queue(Name) when is_atom(Name) -> + Tables = mnesia:system_info(tables), + case lists:member(Name, Tables) of + true -> + mnesia:wait_for_tables([Name], infinity); + false -> + mnesia:create_table(Name, [ + {attributes, record_info(fields, north_data)}, + {record_name, north_data}, + {disc_copies, [node()]}, + {type, ordered_set} + ]) + end. + +-spec table_size(Tab :: atom()) -> integer(). +table_size(Tab) when is_atom(Tab) -> + mnesia:table_info(Tab, size). + +-spec dirty_fetch_next(Tab :: atom(), Cursor :: integer()) -> {ok, NCursor :: integer(), Item :: any()} | '$end_of_table'. +dirty_fetch_next(Tab, Cursor) when is_atom(Tab), is_integer(Cursor) -> + case mnesia:dirty_next(Tab, Cursor) of + '$end_of_table' -> + '$end_of_table'; + NextKey -> + [Item] = mnesia:dirty_read(Tab, NextKey), + {ok, NextKey, Item} + end. diff --git a/apps/iot/src/mocker/iot_mock.erl b/apps/iot/src/mocker/iot_mock.erl index 155c9a0..e642c50 100644 --- a/apps/iot/src/mocker/iot_mock.erl +++ b/apps/iot/src/mocker/iot_mock.erl @@ -13,11 +13,29 @@ %% API -export([rsa_encode/1]). -export([insert_services/1]). --export([insert_endpoints/0]). +-export([insert_endpoints/0, forward/0]). + +forward() -> + Name = <<"zhongguodianli">>, + Pid = iot_endpoint:get_pid(Name), + + lists:foreach(fun(Id0) -> + Id = integer_to_binary(Id0), + Fields = [ + #{ + <<"key">> => <<"test:", Id/binary>>, + <<"value">> => Id, + <<"unit">> => <<"cm">> + } + ], + iot_endpoint:forward(Pid, <<"location_code:", Id/binary>>, Fields) + end, lists:seq(1, 10000)). insert_endpoints() -> Mapper0 = "fun(LocationCode, Fields) -> - Bin = jiffy:encode(Fields#{<<\"location_code\">> => LocationCode}, [force_utf8]), + Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields), + Fields2 = maps:from_list(Fields1), + Bin = jiffy:encode(Fields2#{<<\"location_code\">> => LocationCode}, [force_utf8]), iolist_to_binary(Bin) end.", diff --git a/apps/iot/src/postman/http_postman.erl b/apps/iot/src/postman/http_postman.erl index dcbab9c..b2e6af3 100644 --- a/apps/iot/src/postman/http_postman.erl +++ b/apps/iot/src/postman/http_postman.erl @@ -87,11 +87,11 @@ handle_info(stop, State = #state{pool_name = PoolName, worker_pool_pid = WorkerP {stop, normal, State}; -handle_info({post, #north_data{body = Body, ref = Ref}}, State = #state{parent_pid = ParentPid, url = Url, pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) -> +handle_info({post, #north_data{body = Body, id = Id}}, State = #state{parent_pid = ParentPid, url = Url, pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) -> poolboy:transaction(WorkerPoolPid, fun(Pid) -> case http_postman_worker:post(Pid, Url, Body, PoolName) of ok -> - ParentPid ! {ack, Ref}; + ParentPid ! {ack, Id}; {error, Reason} -> lager:debug("[http_postman] post url: ~p, body: ~p, get error: ~p", [Url, Body, Reason]) end diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index af537ee..0d2c8f1 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -90,9 +90,9 @@ handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, {noreply, State}; handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_pid = ParentPid, inflight = Inflight}) -> case maps:take(PacketId, Inflight) of - {{Ref, Message}, RestInflight} -> + {{Id, Message}, RestInflight} -> lager:debug("[mqtt_postman] receive puback packet: ~p, assoc message: ~p", [Packet, Message]), - ParentPid ! {ack, Ref}, + ParentPid ! {ack, Id}, {noreply, State#state{inflight = RestInflight}}; error -> lager:warning("[mqtt_postman] receive unknown puback packet: ~p", [Packet]), @@ -100,16 +100,16 @@ handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_p end; %% 转发信息 -handle_info({post, #north_data{ref = Ref, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> +handle_info({post, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of ok -> - ParentPid ! {ack, Ref}, + ParentPid ! {ack, Id}, {noreply, State}; {ok, PacketId} -> lager:debug("[mqtt_postman] send success, packet_id: ~p", [PacketId]), - {noreply, State#state{inflight = maps:put(PacketId, {Ref, Message}, InFlight)}}; + {noreply, State#state{inflight = maps:put(PacketId, {Id, Message}, InFlight)}}; {error, Reason} -> lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]), {stop, Reason, State}