diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index 982e3e5..34aa862 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -299,7 +299,7 @@ remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) -> %% 对http和https协议的支持 create_postman(#endpoint{config = #http_endpoint{url = Url, pool_size = PoolSize}}) -> WorkerArgs = [{url, Url}], - broker_postman:start_link(self(), http_postman, WorkerArgs, PoolSize); + broker_postman:start_link(http_postman, WorkerArgs, PoolSize); %% 对mqtt协议的支持, 只需要建立单个链接 create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) -> @@ -319,7 +319,7 @@ create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port {retry_interval, 5} ], - mqtt_postman:start_link(self(), Opts, Topic, Qos); + mqtt_postman:start_link(Opts, Topic, Qos); %% 对mysql协议的支持 create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}) -> @@ -335,7 +335,7 @@ create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, user ]}, {table, TableName} ], - broker_postman:start_link(self(), mysql_postman, WorkerArgs, PoolSize); + broker_postman:start_link(mysql_postman, WorkerArgs, PoolSize); create_postman(#endpoint{}) -> throw(<<"not supported">>). diff --git a/apps/iot/src/postman/broker_postman.erl b/apps/iot/src/postman/broker_postman.erl index 82e180f..764cbca 100644 --- a/apps/iot/src/postman/broker_postman.erl +++ b/apps/iot/src/postman/broker_postman.erl @@ -13,13 +13,12 @@ -behaviour(gen_server). %% API --export([start_link/4]). +-export([start_link/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { - parent_pid :: pid(), pool_pid :: pid() }). @@ -28,10 +27,10 @@ %%%=================================================================== %% @doc Spawns the server and registers the local name (unique) --spec(start_link(ParentPid :: pid(), Mod :: atom(), WorkerArgs :: list(), PoolSize :: integer()) -> +-spec(start_link(Mod :: atom(), WorkerArgs :: list(), PoolSize :: integer()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(ParentPid, Mod, WorkerArgs, PoolSize) when is_pid(ParentPid), is_atom(Mod), is_list(WorkerArgs), is_integer(PoolSize) -> - gen_server:start_link(?MODULE, [ParentPid, Mod, WorkerArgs, PoolSize], []). +start_link(Mod, WorkerArgs, PoolSize) when is_atom(Mod), is_list(WorkerArgs), is_integer(PoolSize) -> + gen_server:start_link(?MODULE, [Mod, WorkerArgs, PoolSize], []). %%%=================================================================== %%% gen_server callbacks @@ -42,11 +41,11 @@ start_link(ParentPid, Mod, WorkerArgs, PoolSize) when is_pid(ParentPid), is_atom -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([ParentPid, Mod, WorkerArgs, PoolSize]) -> +init([Mod, WorkerArgs, PoolSize]) -> %% 启动工作的线程池 {ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, Mod}], WorkerArgs), - {ok, #state{parent_pid = ParentPid, pool_pid = PoolPid}}. + {ok, #state{pool_pid = PoolPid}}. %% @private %% @doc Handling call messages @@ -76,13 +75,12 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({post, ReceiverPid, NorthData}, State = #state{pool_pid = PoolPid}) -> - poolboy:transaction(PoolPid, fun(Pid) -> Pid ! {post, ReceiverPid, NorthData} end), +handle_info({post, ReceiverPid, PostData}, State = #state{pool_pid = PoolPid}) -> + poolboy:transaction(PoolPid, fun(Pid) -> Pid ! {post, ReceiverPid, PostData} end), {noreply, State}; handle_info(stop, State = #state{pool_pid = PoolPid}) -> catch poolboy:stop(PoolPid), - {stop, normal, State}. %% @private diff --git a/apps/iot/src/postman/http_postman.erl b/apps/iot/src/postman/http_postman.erl index 22accb7..0735ca0 100644 --- a/apps/iot/src/postman/http_postman.erl +++ b/apps/iot/src/postman/http_postman.erl @@ -80,11 +80,13 @@ handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #stat case hackney:request(post, Url, Headers, Body) of {ok, 200, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]), ReceiverPid ! {ack, Id}, {noreply, State}; {ok, HttpCode, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), {noreply, State}; {error, Reason} -> diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index b9ee752..0fdfd53 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -13,13 +13,12 @@ -behaviour(gen_server). %% API --export([start_link/4]). +-export([start_link/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { - parent_pid :: pid(), conn_pid :: pid(), topic :: binary(), qos = 0 :: integer(), @@ -31,10 +30,10 @@ %%%=================================================================== %% @doc Spawns the server and registers the local name (unique) --spec(start_link(ParentPid :: pid(), Opts :: list(), Topic :: binary(), Qos :: integer()) -> +-spec(start_link(Opts :: list(), Topic :: binary(), Qos :: integer()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(ParentPid, Opts, Topic, Qos) when is_pid(ParentPid), is_list(Opts), is_binary(Topic), Qos == 0; Qos == 1; Qos == 2 -> - gen_server:start_link(?MODULE, [ParentPid, Opts, Topic, Qos], []). +start_link(Opts, Topic, Qos) when is_list(Opts), is_binary(Topic), Qos == 0; Qos == 1; Qos == 2 -> + gen_server:start_link(?MODULE, [Opts, Topic, Qos], []). %%%=================================================================== %%% gen_server callbacks @@ -45,14 +44,14 @@ start_link(ParentPid, Opts, Topic, Qos) when is_pid(ParentPid), is_list(Opts), i -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([ParentPid, Opts, Topic, Qos]) -> +init([Opts, Topic, Qos]) -> Opts1 = [{owner, self()} | Opts], {ok, ConnPid} = emqtt:start_link(Opts1), lager:debug("[mqtt_postman] start connect, options: ~p", [Opts1]), {ok, _} = emqtt:connect(ConnPid), lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]), - {ok, #state{parent_pid = ParentPid, conn_pid = ConnPid, topic = Topic, qos = Qos}}. + {ok, #state{conn_pid = ConnPid, topic = Topic, qos = Qos}}. %% @private %% @doc Handling call messages @@ -88,14 +87,12 @@ handle_info({disconnected, ReasonCode, Properties}, State = #state{}) -> handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{conn_pid = _ConnPid}) -> lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]), {noreply, State}; -handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_pid = ParentPid, inflight = Inflight}) -> +handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflight}) -> case maps:take(PacketId, Inflight) of - {{Id, Message}, RestInflight} -> - lager:debug("[mqtt_postman] receive puback packet: ~p, assoc message: ~p", [Packet, Message]), - ParentPid ! {ack, Id}, + {{Id, ReceiverPid}, RestInflight} -> + ReceiverPid ! {ack, Id}, {noreply, State#state{inflight = RestInflight}}; error -> - lager:warning("[mqtt_postman] receive unknown puback packet: ~p", [Packet]), {noreply, State} end; @@ -108,8 +105,7 @@ handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode ReceiverPid ! {ack, Id}, {noreply, State}; {ok, PacketId} -> - lager:debug("[mqtt_postman] send success, packet_id: ~p", [PacketId]), - {noreply, State#state{inflight = maps:put(PacketId, {Id, Message}, InFlight)}}; + {noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid}, InFlight)}}; {error, Reason} -> lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]), {stop, Reason, State} @@ -118,7 +114,7 @@ handle_info(stop, State) -> {stop, normal, State}; handle_info(Info, State = #state{}) -> - lager:debug("[mqtt_postman] get info: ~p", [Info]), + lager:notice("[mqtt_postman] get unknown info: ~p", [Info]), {noreply, State}. %% @private