fix postman
This commit is contained in:
parent
11fe527135
commit
b18a244a19
@ -92,17 +92,17 @@ handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer
|
|||||||
{ok, 200, _, ClientRef} ->
|
{ok, 200, _, ClientRef} ->
|
||||||
{ok, RespBody} = hackney:body(ClientRef),
|
{ok, RespBody} = hackney:body(ClientRef),
|
||||||
hackney:close(ClientRef),
|
hackney:close(ClientRef),
|
||||||
lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]),
|
lager:debug("[endpoint_http] url: ~p, response is: ~p", [Url, RespBody]),
|
||||||
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
||||||
|
|
||||||
{noreply, State#state{buffer = NBuffer}};
|
{noreply, State#state{buffer = NBuffer}};
|
||||||
{ok, HttpCode, _, ClientRef} ->
|
{ok, HttpCode, _, ClientRef} ->
|
||||||
{ok, RespBody} = hackney:body(ClientRef),
|
{ok, RespBody} = hackney:body(ClientRef),
|
||||||
hackney:close(ClientRef),
|
hackney:close(ClientRef),
|
||||||
lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]),
|
lager:debug("[endpoint_http] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:warning("[http_postman] url: ~p, get error: ~p", [Url, Reason]),
|
lager:warning("[endpoint_http] url: ~p, get error: ~p", [Url, Reason]),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|||||||
@ -93,7 +93,7 @@ handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
|
|||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId,
|
handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId,
|
||||||
endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic}}}) ->
|
endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic}}}) ->
|
||||||
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]),
|
lager:debug("[endpoint_kafka] endpoint: ~p, create postman", [Title]),
|
||||||
|
|
||||||
ClientConfig = [
|
ClientConfig = [
|
||||||
{reconnect_cool_down_seconds, 5},
|
{reconnect_cool_down_seconds, 5},
|
||||||
|
|||||||
@ -94,7 +94,7 @@ handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
|
|||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED,
|
handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED,
|
||||||
endpoint = #endpoint{title = Title, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) ->
|
endpoint = #endpoint{title = Title, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) ->
|
||||||
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]),
|
lager:debug("[endpoint_mqtt] endpoint: ~p, create postman", [Title]),
|
||||||
Opts = [
|
Opts = [
|
||||||
{owner, self()},
|
{owner, self()},
|
||||||
{clientid, ClientId},
|
{clientid, ClientId},
|
||||||
@ -111,14 +111,14 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
|
|||||||
],
|
],
|
||||||
|
|
||||||
{ok, ConnPid} = emqtt:start_link(Opts),
|
{ok, ConnPid} = emqtt:start_link(Opts),
|
||||||
lager:debug("[mqtt_postman] start connect, options: ~p", [Opts]),
|
lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]),
|
||||||
case emqtt:connect(ConnPid, 5000) of
|
case emqtt:connect(ConnPid, 5000) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]),
|
lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]),
|
||||||
NBuffer = endpoint_buffer:trigger_n(Buffer),
|
NBuffer = endpoint_buffer:trigger_n(Buffer),
|
||||||
{noreply, State#state{conn_pid = ConnPid, buffer = NBuffer, status = ?CONNECTED}};
|
{noreply, State#state{conn_pid = ConnPid, buffer = NBuffer, status = ?CONNECTED}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:warning("[mqtt_postman] connect get error: ~p", [Reason]),
|
lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]),
|
||||||
erlang:start_timer(5000, self(), create_postman),
|
erlang:start_timer(5000, self(), create_postman),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
@ -131,7 +131,7 @@ handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNE
|
|||||||
endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
|
endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
|
||||||
|
|
||||||
Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]),
|
Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]),
|
||||||
lager:debug("[mqtt_postman] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]),
|
lager:debug("[endpoint_mqtt] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]),
|
||||||
case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of
|
case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of
|
||||||
ok ->
|
ok ->
|
||||||
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
||||||
@ -139,17 +139,17 @@ handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNE
|
|||||||
{ok, PacketId} ->
|
{ok, PacketId} ->
|
||||||
{noreply, State#state{inflight = maps:put(PacketId, Id, InFlight)}};
|
{noreply, State#state{inflight = maps:put(PacketId, Id, InFlight)}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]),
|
lager:warning("[endpoint_mqtt] send message to topic: ~p, get error: ~p", [Topic, Reason]),
|
||||||
{stop, Reason, State}
|
{stop, Reason, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({disconnected, ReasonCode, Properties}, State = #state{status = ?CONNECTED}) ->
|
handle_info({disconnected, ReasonCode, Properties}, State = #state{status = ?CONNECTED}) ->
|
||||||
lager:debug("[mqtt_postman] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
lager:debug("[endpoint_mqtt] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
||||||
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
||||||
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
|
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
|
||||||
|
|
||||||
handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{status = ?CONNECTED}) ->
|
handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{status = ?CONNECTED}) ->
|
||||||
lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]),
|
lager:debug("[endpoint_mqtt] Recv a publish packet: ~p, payload: ~p", [Message, Payload]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% 收到确认的消息
|
%% 收到确认的消息
|
||||||
@ -164,12 +164,12 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{status = ?CONNECT
|
|||||||
|
|
||||||
%% postman进程挂掉时,重新建立新的
|
%% postman进程挂掉时,重新建立新的
|
||||||
handle_info({'EXIT', ConnPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, conn_pid = ConnPid}) ->
|
handle_info({'EXIT', ConnPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, conn_pid = ConnPid}) ->
|
||||||
lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]),
|
lager:warning("[endpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]),
|
||||||
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
||||||
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
|
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
|
||||||
|
|
||||||
handle_info(Info, State = #state{status = Status}) ->
|
handle_info(Info, State = #state{status = Status}) ->
|
||||||
lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]),
|
lager:warning("[endpoint_mqtt] unknown message: ~p, status: ~p", [Info, Status]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -180,7 +180,7 @@ handle_info(Info, State = #state{status = Status}) ->
|
|||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
State :: #state{}) -> term()).
|
State :: #state{}) -> term()).
|
||||||
terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
|
terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
|
||||||
lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]),
|
lager:debug("[endpoint_mqtt] endpoint: ~p, terminate with reason: ~p", [Title, Reason]),
|
||||||
endpoint_buffer:cleanup(Buffer),
|
endpoint_buffer:cleanup(Buffer),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|||||||
@ -19,27 +19,23 @@ handle_request("POST", "/service/push_config", _,
|
|||||||
when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) ->
|
when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) ->
|
||||||
|
|
||||||
%% 检查ConfigJson是否是合法的json字符串
|
%% 检查ConfigJson是否是合法的json字符串
|
||||||
case iot_util:is_json(ConfigJson) of
|
true = iot_util:is_json(ConfigJson),
|
||||||
true ->
|
case iot_host:get_pid(UUID) of
|
||||||
case iot_host:get_pid(UUID) of
|
undefined ->
|
||||||
undefined ->
|
{ok, 200, iot_util:json_error(-1, <<"host not found">>)};
|
||||||
{ok, 200, iot_util:json_error(-1, <<"host not found">>)};
|
Pid when is_pid(Pid) ->
|
||||||
Pid when is_pid(Pid) ->
|
Timeout = Timeout0 * 1000,
|
||||||
Timeout = Timeout0 * 1000,
|
case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of
|
||||||
case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of
|
{ok, Ref} ->
|
||||||
{ok, Ref} ->
|
case iot_host:await_reply(Ref, Timeout) of
|
||||||
case iot_host:await_reply(Ref, Timeout) of
|
{ok, Result} ->
|
||||||
{ok, Result} ->
|
{ok, 200, iot_util:json_data(Result)};
|
||||||
{ok, 200, iot_util:json_data(Result)};
|
{error, Reason} ->
|
||||||
{error, Reason} ->
|
|
||||||
{ok, 200, iot_util:json_error(-1, Reason)}
|
|
||||||
end;
|
|
||||||
{error, Reason} when is_binary(Reason) ->
|
|
||||||
{ok, 200, iot_util:json_error(-1, Reason)}
|
{ok, 200, iot_util:json_error(-1, Reason)}
|
||||||
end
|
end;
|
||||||
end;
|
{error, Reason} when is_binary(Reason) ->
|
||||||
false ->
|
{ok, 200, iot_util:json_error(-1, Reason)}
|
||||||
{ok, 200, iot_util:json_error(-1, <<"config is invalid json">>)}
|
end
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% 部署微服务
|
%% 部署微服务
|
||||||
|
|||||||
@ -1,107 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author aresei
|
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%% 支持以多进程池的方式工作
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 07. 8月 2023 10:15
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(broker_postman).
|
|
||||||
-author("aresei").
|
|
||||||
-include("iot.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/3]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
pool_pid :: pid()
|
|
||||||
}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% API
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_link(Mod :: atom(), WorkerArgs :: list(), PoolSize :: integer()) ->
|
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
|
||||||
start_link(Mod, WorkerArgs, PoolSize) when is_atom(Mod), is_list(WorkerArgs), is_integer(PoolSize) ->
|
|
||||||
gen_server:start_link(?MODULE, [Mod, WorkerArgs, PoolSize], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Initializes the server
|
|
||||||
-spec(init(Args :: term()) ->
|
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([Mod, WorkerArgs, PoolSize]) ->
|
|
||||||
%% 启动工作的线程池
|
|
||||||
{ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, Mod}], WorkerArgs),
|
|
||||||
|
|
||||||
{ok, #state{pool_pid = PoolPid}}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling call messages
|
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
|
||||||
{reply, ok, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast(_Request, State = #state{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling all non call/cast messages
|
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_info({post, ReceiverPid, PostData}, State = #state{pool_pid = PoolPid}) ->
|
|
||||||
poolboy:transaction(PoolPid, fun(Pid) -> Pid ! {post, ReceiverPid, PostData} end),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_info(stop, State = #state{pool_pid = PoolPid}) ->
|
|
||||||
catch poolboy:stop(PoolPid),
|
|
||||||
{stop, normal, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
||||||
%% with Reason. The return value is ignored.
|
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
|
||||||
State :: #state{}) -> term()).
|
|
||||||
terminate(Reason, _State = #state{}) ->
|
|
||||||
lager:debug("[broker_postman] terminate with reason: ~p", [Reason]),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
@ -1,118 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author aresei
|
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 06. 7月 2023 16:23
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(http_postman).
|
|
||||||
-author("aresei").
|
|
||||||
-include("iot.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/1]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
url :: binary()
|
|
||||||
}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% API
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% 为了方便通过poolboy调用,采用的proplist
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_link(Args :: proplists:proplist()) ->
|
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
|
||||||
start_link(Args) when is_list(Args) ->
|
|
||||||
gen_server:start_link(?MODULE, [Args], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Initializes the server
|
|
||||||
-spec(init(Args :: term()) ->
|
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([Args]) ->
|
|
||||||
Url = proplists:get_value(url, Args),
|
|
||||||
{ok, #state{url = Url}}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling call messages
|
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast(_Request, State = #state{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling all non call/cast messages
|
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #state{url = Url}) ->
|
|
||||||
Headers = [
|
|
||||||
{<<"content-type">>, <<"application/json">>}
|
|
||||||
],
|
|
||||||
case hackney:request(post, Url, Headers, Body) of
|
|
||||||
{ok, 200, _, ClientRef} ->
|
|
||||||
{ok, RespBody} = hackney:body(ClientRef),
|
|
||||||
hackney:close(ClientRef),
|
|
||||||
ReceiverPid ! {ack, Id, {ok, Body, RespBody}},
|
|
||||||
{noreply, State};
|
|
||||||
{ok, HttpCode, _, ClientRef} ->
|
|
||||||
{ok, RespBody} = hackney:body(ClientRef),
|
|
||||||
hackney:close(ClientRef),
|
|
||||||
ReceiverPid ! {ack, Id, {error, Body, {HttpCode, RespBody}}},
|
|
||||||
{noreply, State};
|
|
||||||
{error, Reason} ->
|
|
||||||
ReceiverPid ! {ack, Id, {error, Body, Reason}},
|
|
||||||
{noreply, State}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
||||||
%% with Reason. The return value is ignored.
|
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
|
||||||
State :: #state{}) -> term()).
|
|
||||||
terminate(Reason, #state{url = Url}) ->
|
|
||||||
lager:debug("[http_postman] url: ~p, terminate with reason: ~p", [Url, Reason]),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
@ -1,147 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author aresei
|
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 12. 3月 2023 21:27
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(mqtt_postman).
|
|
||||||
-author("aresei").
|
|
||||||
-include("iot.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/3]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
conn_pid :: pid(),
|
|
||||||
topic :: binary(),
|
|
||||||
qos = 0 :: integer(),
|
|
||||||
inflight = #{}
|
|
||||||
}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% API
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_link(Opts :: list(), Topic :: binary(), Qos :: integer()) ->
|
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
|
||||||
start_link(Opts, Topic, Qos) when is_list(Opts), is_binary(Topic), Qos == 0; Qos == 1; Qos == 2 ->
|
|
||||||
gen_server:start_link(?MODULE, [Opts, Topic, Qos], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Initializes the server
|
|
||||||
-spec(init(Args :: term()) ->
|
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([Opts, Topic, Qos]) ->
|
|
||||||
Opts1 = [{owner, self()} | Opts],
|
|
||||||
{ok, ConnPid} = emqtt:start_link(Opts1),
|
|
||||||
lager:debug("[mqtt_postman] start connect, options: ~p", [Opts1]),
|
|
||||||
{ok, _} = emqtt:connect(ConnPid, 5000),
|
|
||||||
lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]),
|
|
||||||
|
|
||||||
{ok, #state{conn_pid = ConnPid, topic = Topic, qos = Qos}}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling call messages
|
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(_Info, _From, State) ->
|
|
||||||
{reply, ok, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast(_Info, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling all non call/cast messages
|
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_info({disconnected, ReasonCode, Properties}, State = #state{}) ->
|
|
||||||
lager:debug("[mqtt_postman] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
|
||||||
{stop, disconnected, State};
|
|
||||||
handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{conn_pid = _ConnPid}) ->
|
|
||||||
lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]),
|
|
||||||
{noreply, State};
|
|
||||||
handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflight}) ->
|
|
||||||
case maps:take(PacketId, Inflight) of
|
|
||||||
{{Id, ReceiverPid, AssocMessage}, RestInflight} ->
|
|
||||||
ReceiverPid ! {ack, Id, AssocMessage},
|
|
||||||
{noreply, State#state{inflight = RestInflight}};
|
|
||||||
error ->
|
|
||||||
{noreply, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% 转发信息
|
|
||||||
handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}},
|
|
||||||
State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
|
|
||||||
|
|
||||||
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
|
|
||||||
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
|
||||||
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
|
||||||
ok ->
|
|
||||||
ReceiverPid ! {ack, Id, Message},
|
|
||||||
{noreply, State};
|
|
||||||
{ok, PacketId} ->
|
|
||||||
{noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}};
|
|
||||||
{error, Reason} ->
|
|
||||||
lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]),
|
|
||||||
{stop, Reason, State}
|
|
||||||
end;
|
|
||||||
handle_info(stop, State) ->
|
|
||||||
{stop, normal, State};
|
|
||||||
|
|
||||||
handle_info(Info, State = #state{}) ->
|
|
||||||
lager:notice("[mqtt_postman] get unknown info: ~p", [Info]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
||||||
%% with Reason. The return value is ignored.
|
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
|
||||||
State :: #state{}) -> term()).
|
|
||||||
terminate(Reason, #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
|
|
||||||
ok = emqtt:disconnect(ConnPid),
|
|
||||||
lager:debug("[mqtt_postman] terminate with reason: ~p", [Reason]),
|
|
||||||
ok;
|
|
||||||
terminate(Reason, _State) ->
|
|
||||||
lager:debug("[mqtt_postman] terminate with reason: ~p", [Reason]),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
@ -1,116 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author aresei
|
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 07. 8月 2023 10:15
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(mysql_postman).
|
|
||||||
-author("aresei").
|
|
||||||
-include("iot.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/1]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
mysql_pid :: pid(),
|
|
||||||
table :: binary()
|
|
||||||
}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% API
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_link(Args :: list()) ->
|
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
|
||||||
start_link(Args) when is_list(Args) ->
|
|
||||||
gen_server:start_link(?MODULE, [Args], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Initializes the server
|
|
||||||
-spec(init(Args :: term()) ->
|
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([Args]) ->
|
|
||||||
MysqlOpts = proplists:get_value(mysql_opts, Args),
|
|
||||||
Table = proplists:get_value(table, Args),
|
|
||||||
|
|
||||||
{ok, ConnPid} = mysql:start_link(MysqlOpts),
|
|
||||||
|
|
||||||
{ok, #state{mysql_pid = ConnPid, table = Table}}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling call messages
|
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
|
||||||
{reply, ok, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast(_Request, State = #state{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling all non call/cast messages
|
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) ->
|
|
||||||
case catch mysql_provider:insert(ConnPid, Table, Fields, false) of
|
|
||||||
ok ->
|
|
||||||
ReceiverPid ! {ack, Id};
|
|
||||||
Error ->
|
|
||||||
lager:debug("[mysql_postman] insert table: ~p, res is: ~p", [Table, Error])
|
|
||||||
end,
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_info(stop, State = #state{mysql_pid = ConnPid}) ->
|
|
||||||
mysql:stop(ConnPid),
|
|
||||||
{stop, normal, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
||||||
%% with Reason. The return value is ignored.
|
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
|
||||||
State :: #state{}) -> term()).
|
|
||||||
terminate(_Reason, _State = #state{}) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
Loading…
x
Reference in New Issue
Block a user