remove unless code
This commit is contained in:
parent
40421eddc8
commit
14df94975e
@ -299,7 +299,7 @@ remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) ->
|
||||
%% 对http和https协议的支持
|
||||
create_postman(#endpoint{config = #http_endpoint{url = Url, pool_size = PoolSize}}) ->
|
||||
WorkerArgs = [{url, Url}],
|
||||
broker_postman:start_link(self(), http_postman, WorkerArgs, PoolSize);
|
||||
broker_postman:start_link(http_postman, WorkerArgs, PoolSize);
|
||||
|
||||
%% 对mqtt协议的支持, 只需要建立单个链接
|
||||
create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) ->
|
||||
@ -319,7 +319,7 @@ create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port
|
||||
{retry_interval, 5}
|
||||
],
|
||||
|
||||
mqtt_postman:start_link(self(), Opts, Topic, Qos);
|
||||
mqtt_postman:start_link(Opts, Topic, Qos);
|
||||
|
||||
%% 对mysql协议的支持
|
||||
create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}) ->
|
||||
@ -335,7 +335,7 @@ create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, user
|
||||
]},
|
||||
{table, TableName}
|
||||
],
|
||||
broker_postman:start_link(self(), mysql_postman, WorkerArgs, PoolSize);
|
||||
broker_postman:start_link(mysql_postman, WorkerArgs, PoolSize);
|
||||
create_postman(#endpoint{}) ->
|
||||
throw(<<"not supported">>).
|
||||
|
||||
|
||||
@ -13,13 +13,12 @@
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/4]).
|
||||
-export([start_link/3]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {
|
||||
parent_pid :: pid(),
|
||||
pool_pid :: pid()
|
||||
}).
|
||||
|
||||
@ -28,10 +27,10 @@
|
||||
%%%===================================================================
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(ParentPid :: pid(), Mod :: atom(), WorkerArgs :: list(), PoolSize :: integer()) ->
|
||||
-spec(start_link(Mod :: atom(), WorkerArgs :: list(), PoolSize :: integer()) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(ParentPid, Mod, WorkerArgs, PoolSize) when is_pid(ParentPid), is_atom(Mod), is_list(WorkerArgs), is_integer(PoolSize) ->
|
||||
gen_server:start_link(?MODULE, [ParentPid, Mod, WorkerArgs, PoolSize], []).
|
||||
start_link(Mod, WorkerArgs, PoolSize) when is_atom(Mod), is_list(WorkerArgs), is_integer(PoolSize) ->
|
||||
gen_server:start_link(?MODULE, [Mod, WorkerArgs, PoolSize], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
@ -42,11 +41,11 @@ start_link(ParentPid, Mod, WorkerArgs, PoolSize) when is_pid(ParentPid), is_atom
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([ParentPid, Mod, WorkerArgs, PoolSize]) ->
|
||||
init([Mod, WorkerArgs, PoolSize]) ->
|
||||
%% 启动工作的线程池
|
||||
{ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, Mod}], WorkerArgs),
|
||||
|
||||
{ok, #state{parent_pid = ParentPid, pool_pid = PoolPid}}.
|
||||
{ok, #state{pool_pid = PoolPid}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
@ -76,13 +75,12 @@ handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({post, ReceiverPid, NorthData}, State = #state{pool_pid = PoolPid}) ->
|
||||
poolboy:transaction(PoolPid, fun(Pid) -> Pid ! {post, ReceiverPid, NorthData} end),
|
||||
handle_info({post, ReceiverPid, PostData}, State = #state{pool_pid = PoolPid}) ->
|
||||
poolboy:transaction(PoolPid, fun(Pid) -> Pid ! {post, ReceiverPid, PostData} end),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(stop, State = #state{pool_pid = PoolPid}) ->
|
||||
catch poolboy:stop(PoolPid),
|
||||
|
||||
{stop, normal, State}.
|
||||
|
||||
%% @private
|
||||
|
||||
@ -80,11 +80,13 @@ handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #stat
|
||||
case hackney:request(post, Url, Headers, Body) of
|
||||
{ok, 200, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]),
|
||||
ReceiverPid ! {ack, Id},
|
||||
{noreply, State};
|
||||
{ok, HttpCode, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]),
|
||||
{noreply, State};
|
||||
{error, Reason} ->
|
||||
|
||||
@ -13,13 +13,12 @@
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/4]).
|
||||
-export([start_link/3]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {
|
||||
parent_pid :: pid(),
|
||||
conn_pid :: pid(),
|
||||
topic :: binary(),
|
||||
qos = 0 :: integer(),
|
||||
@ -31,10 +30,10 @@
|
||||
%%%===================================================================
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(ParentPid :: pid(), Opts :: list(), Topic :: binary(), Qos :: integer()) ->
|
||||
-spec(start_link(Opts :: list(), Topic :: binary(), Qos :: integer()) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(ParentPid, Opts, Topic, Qos) when is_pid(ParentPid), is_list(Opts), is_binary(Topic), Qos == 0; Qos == 1; Qos == 2 ->
|
||||
gen_server:start_link(?MODULE, [ParentPid, Opts, Topic, Qos], []).
|
||||
start_link(Opts, Topic, Qos) when is_list(Opts), is_binary(Topic), Qos == 0; Qos == 1; Qos == 2 ->
|
||||
gen_server:start_link(?MODULE, [Opts, Topic, Qos], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
@ -45,14 +44,14 @@ start_link(ParentPid, Opts, Topic, Qos) when is_pid(ParentPid), is_list(Opts), i
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([ParentPid, Opts, Topic, Qos]) ->
|
||||
init([Opts, Topic, Qos]) ->
|
||||
Opts1 = [{owner, self()} | Opts],
|
||||
{ok, ConnPid} = emqtt:start_link(Opts1),
|
||||
lager:debug("[mqtt_postman] start connect, options: ~p", [Opts1]),
|
||||
{ok, _} = emqtt:connect(ConnPid),
|
||||
lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]),
|
||||
|
||||
{ok, #state{parent_pid = ParentPid, conn_pid = ConnPid, topic = Topic, qos = Qos}}.
|
||||
{ok, #state{conn_pid = ConnPid, topic = Topic, qos = Qos}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
@ -88,14 +87,12 @@ handle_info({disconnected, ReasonCode, Properties}, State = #state{}) ->
|
||||
handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{conn_pid = _ConnPid}) ->
|
||||
lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]),
|
||||
{noreply, State};
|
||||
handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_pid = ParentPid, inflight = Inflight}) ->
|
||||
handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflight}) ->
|
||||
case maps:take(PacketId, Inflight) of
|
||||
{{Id, Message}, RestInflight} ->
|
||||
lager:debug("[mqtt_postman] receive puback packet: ~p, assoc message: ~p", [Packet, Message]),
|
||||
ParentPid ! {ack, Id},
|
||||
{{Id, ReceiverPid}, RestInflight} ->
|
||||
ReceiverPid ! {ack, Id},
|
||||
{noreply, State#state{inflight = RestInflight}};
|
||||
error ->
|
||||
lager:warning("[mqtt_postman] receive unknown puback packet: ~p", [Packet]),
|
||||
{noreply, State}
|
||||
end;
|
||||
|
||||
@ -108,8 +105,7 @@ handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode
|
||||
ReceiverPid ! {ack, Id},
|
||||
{noreply, State};
|
||||
{ok, PacketId} ->
|
||||
lager:debug("[mqtt_postman] send success, packet_id: ~p", [PacketId]),
|
||||
{noreply, State#state{inflight = maps:put(PacketId, {Id, Message}, InFlight)}};
|
||||
{noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid}, InFlight)}};
|
||||
{error, Reason} ->
|
||||
lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]),
|
||||
{stop, Reason, State}
|
||||
@ -118,7 +114,7 @@ handle_info(stop, State) ->
|
||||
{stop, normal, State};
|
||||
|
||||
handle_info(Info, State = #state{}) ->
|
||||
lager:debug("[mqtt_postman] get info: ~p", [Info]),
|
||||
lager:notice("[mqtt_postman] get unknown info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user