diff --git a/apps/iot/src/endpoint/endpoint_http.erl b/apps/iot/src/endpoint/endpoint_http.erl index 536a87b..fe2f77a 100644 --- a/apps/iot/src/endpoint/endpoint_http.erl +++ b/apps/iot/src/endpoint/endpoint_http.erl @@ -92,17 +92,17 @@ handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer {ok, 200, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), hackney:close(ClientRef), - lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]), + lager:debug("[endpoint_http] url: ~p, response is: ~p", [Url, RespBody]), NBuffer = endpoint_buffer:ack(Id, Buffer), {noreply, State#state{buffer = NBuffer}}; {ok, HttpCode, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), hackney:close(ClientRef), - lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), + lager:debug("[endpoint_http] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), {noreply, State}; {error, Reason} -> - lager:warning("[http_postman] url: ~p, get error: ~p", [Url, Reason]), + lager:warning("[endpoint_http] url: ~p, get error: ~p", [Url, Reason]), {noreply, State} end. diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index 89740da..bcb92ab 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -93,7 +93,7 @@ handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> {stop, Reason :: term(), NewState :: #state{}}). handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId, endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, mechanism = Mechanism, bootstrap_servers = BootstrapServers, topic = Topic}}}) -> - lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]), + lager:debug("[endpoint_kafka] endpoint: ~p, create postman", [Title]), ClientConfig = [ {reconnect_cool_down_seconds, 5}, diff --git a/apps/iot/src/endpoint/endpoint_mqtt.erl b/apps/iot/src/endpoint/endpoint_mqtt.erl index 2f13a01..6133b1f 100644 --- a/apps/iot/src/endpoint/endpoint_mqtt.erl +++ b/apps/iot/src/endpoint/endpoint_mqtt.erl @@ -94,7 +94,7 @@ handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> {stop, Reason :: term(), NewState :: #state{}}). handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED, endpoint = #endpoint{title = Title, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) -> - lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]), + lager:debug("[endpoint_mqtt] endpoint: ~p, create postman", [Title]), Opts = [ {owner, self()}, {clientid, ClientId}, @@ -111,14 +111,14 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status ], {ok, ConnPid} = emqtt:start_link(Opts), - lager:debug("[mqtt_postman] start connect, options: ~p", [Opts]), + lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]), case emqtt:connect(ConnPid, 5000) of {ok, _} -> - lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]), + lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]), NBuffer = endpoint_buffer:trigger_n(Buffer), {noreply, State#state{conn_pid = ConnPid, buffer = NBuffer, status = ?CONNECTED}}; {error, Reason} -> - lager:warning("[mqtt_postman] connect get error: ~p", [Reason]), + lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]), erlang:start_timer(5000, self(), create_postman), {noreply, State} end; @@ -131,7 +131,7 @@ handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNE endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) -> Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]), - lager:debug("[mqtt_postman] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]), + lager:debug("[endpoint_mqtt] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of ok -> NBuffer = endpoint_buffer:ack(Id, Buffer), @@ -139,17 +139,17 @@ handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNE {ok, PacketId} -> {noreply, State#state{inflight = maps:put(PacketId, Id, InFlight)}}; {error, Reason} -> - lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]), + lager:warning("[endpoint_mqtt] send message to topic: ~p, get error: ~p", [Topic, Reason]), {stop, Reason, State} end; handle_info({disconnected, ReasonCode, Properties}, State = #state{status = ?CONNECTED}) -> - lager:debug("[mqtt_postman] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), + lager:debug("[endpoint_mqtt] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), {noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}}; handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{status = ?CONNECTED}) -> - lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]), + lager:debug("[endpoint_mqtt] Recv a publish packet: ~p, payload: ~p", [Message, Payload]), {noreply, State}; %% 收到确认的消息 @@ -164,12 +164,12 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{status = ?CONNECT %% postman进程挂掉时,重新建立新的 handle_info({'EXIT', ConnPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, conn_pid = ConnPid}) -> - lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]), + lager:warning("[endpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]), erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), {noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}}; handle_info(Info, State = #state{status = Status}) -> - lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]), + lager:warning("[endpoint_mqtt] unknown message: ~p, status: ~p", [Info, Status]), {noreply, State}. %% @private @@ -180,7 +180,7 @@ handle_info(Info, State = #state{status = Status}) -> -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) -> - lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]), + lager:debug("[endpoint_mqtt] endpoint: ~p, terminate with reason: ~p", [Title, Reason]), endpoint_buffer:cleanup(Buffer), ok. diff --git a/apps/iot/src/http_handlers/service_handler.erl b/apps/iot/src/http_handlers/service_handler.erl index a2a80a7..9e511f4 100644 --- a/apps/iot/src/http_handlers/service_handler.erl +++ b/apps/iot/src/http_handlers/service_handler.erl @@ -19,27 +19,23 @@ handle_request("POST", "/service/push_config", _, when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) -> %% 检查ConfigJson是否是合法的json字符串 - case iot_util:is_json(ConfigJson) of - true -> - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(-1, <<"host not found">>)}; - Pid when is_pid(Pid) -> - Timeout = Timeout0 * 1000, - case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of - {ok, Ref} -> - case iot_host:await_reply(Ref, Timeout) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(-1, Reason)} - end; - {error, Reason} when is_binary(Reason) -> + true = iot_util:is_json(ConfigJson), + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(-1, <<"host not found">>)}; + Pid when is_pid(Pid) -> + Timeout = Timeout0 * 1000, + case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of + {ok, Ref} -> + case iot_host:await_reply(Ref, Timeout) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> {ok, 200, iot_util:json_error(-1, Reason)} - end - end; - false -> - {ok, 200, iot_util:json_error(-1, <<"config is invalid json">>)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(-1, Reason)} + end end; %% 部署微服务 diff --git a/apps/iot/src/postman/broker_postman.erl b/apps/iot/src/postman/broker_postman.erl deleted file mode 100644 index 764cbca..0000000 --- a/apps/iot/src/postman/broker_postman.erl +++ /dev/null @@ -1,107 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% 支持以多进程池的方式工作 -%%% @end -%%% Created : 07. 8月 2023 10:15 -%%%------------------------------------------------------------------- --module(broker_postman). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/3]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --record(state, { - pool_pid :: pid() -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(Mod :: atom(), WorkerArgs :: list(), PoolSize :: integer()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Mod, WorkerArgs, PoolSize) when is_atom(Mod), is_list(WorkerArgs), is_integer(PoolSize) -> - gen_server:start_link(?MODULE, [Mod, WorkerArgs, PoolSize], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([Mod, WorkerArgs, PoolSize]) -> - %% 启动工作的线程池 - {ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, Mod}], WorkerArgs), - - {ok, #state{pool_pid = PoolPid}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({post, ReceiverPid, PostData}, State = #state{pool_pid = PoolPid}) -> - poolboy:transaction(PoolPid, fun(Pid) -> Pid ! {post, ReceiverPid, PostData} end), - {noreply, State}; - -handle_info(stop, State = #state{pool_pid = PoolPid}) -> - catch poolboy:stop(PoolPid), - {stop, normal, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(Reason, _State = #state{}) -> - lager:debug("[broker_postman] terminate with reason: ~p", [Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/apps/iot/src/postman/http_postman.erl b/apps/iot/src/postman/http_postman.erl deleted file mode 100644 index edbe9ba..0000000 --- a/apps/iot/src/postman/http_postman.erl +++ /dev/null @@ -1,118 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 06. 7月 2023 16:23 -%%%------------------------------------------------------------------- --module(http_postman). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --record(state, { - url :: binary() -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% 为了方便通过poolboy调用,采用的proplist -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(Args :: proplists:proplist()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Args) when is_list(Args) -> - gen_server:start_link(?MODULE, [Args], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([Args]) -> - Url = proplists:get_value(url, Args), - {ok, #state{url = Url}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #state{url = Url}) -> - Headers = [ - {<<"content-type">>, <<"application/json">>} - ], - case hackney:request(post, Url, Headers, Body) of - {ok, 200, _, ClientRef} -> - {ok, RespBody} = hackney:body(ClientRef), - hackney:close(ClientRef), - ReceiverPid ! {ack, Id, {ok, Body, RespBody}}, - {noreply, State}; - {ok, HttpCode, _, ClientRef} -> - {ok, RespBody} = hackney:body(ClientRef), - hackney:close(ClientRef), - ReceiverPid ! {ack, Id, {error, Body, {HttpCode, RespBody}}}, - {noreply, State}; - {error, Reason} -> - ReceiverPid ! {ack, Id, {error, Body, Reason}}, - {noreply, State} - end. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(Reason, #state{url = Url}) -> - lager:debug("[http_postman] url: ~p, terminate with reason: ~p", [Url, Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl deleted file mode 100644 index 41752aa..0000000 --- a/apps/iot/src/postman/mqtt_postman.erl +++ /dev/null @@ -1,147 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 12. 3月 2023 21:27 -%%%------------------------------------------------------------------- --module(mqtt_postman). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/3]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --record(state, { - conn_pid :: pid(), - topic :: binary(), - qos = 0 :: integer(), - inflight = #{} -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(Opts :: list(), Topic :: binary(), Qos :: integer()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Opts, Topic, Qos) when is_list(Opts), is_binary(Topic), Qos == 0; Qos == 1; Qos == 2 -> - gen_server:start_link(?MODULE, [Opts, Topic, Qos], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([Opts, Topic, Qos]) -> - Opts1 = [{owner, self()} | Opts], - {ok, ConnPid} = emqtt:start_link(Opts1), - lager:debug("[mqtt_postman] start connect, options: ~p", [Opts1]), - {ok, _} = emqtt:connect(ConnPid, 5000), - lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]), - - {ok, #state{conn_pid = ConnPid, topic = Topic, qos = Qos}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Info, _From, State) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Info, State) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({disconnected, ReasonCode, Properties}, State = #state{}) -> - lager:debug("[mqtt_postman] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), - {stop, disconnected, State}; -handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{conn_pid = _ConnPid}) -> - lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]), - {noreply, State}; -handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflight}) -> - case maps:take(PacketId, Inflight) of - {{Id, ReceiverPid, AssocMessage}, RestInflight} -> - ReceiverPid ! {ack, Id, AssocMessage}, - {noreply, State#state{inflight = RestInflight}}; - error -> - {noreply, State} - end; - -%% 转发信息 -handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, - State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> - - Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), - lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]), - case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of - ok -> - ReceiverPid ! {ack, Id, Message}, - {noreply, State}; - {ok, PacketId} -> - {noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}}; - {error, Reason} -> - lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]), - {stop, Reason, State} - end; -handle_info(stop, State) -> - {stop, normal, State}; - -handle_info(Info, State = #state{}) -> - lager:notice("[mqtt_postman] get unknown info: ~p", [Info]), - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(Reason, #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> - ok = emqtt:disconnect(ConnPid), - lager:debug("[mqtt_postman] terminate with reason: ~p", [Reason]), - ok; -terminate(Reason, _State) -> - lager:debug("[mqtt_postman] terminate with reason: ~p", [Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== \ No newline at end of file diff --git a/apps/iot/src/postman/mysql_postman.erl b/apps/iot/src/postman/mysql_postman.erl deleted file mode 100644 index 9217397..0000000 --- a/apps/iot/src/postman/mysql_postman.erl +++ /dev/null @@ -1,116 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 07. 8月 2023 10:15 -%%%------------------------------------------------------------------- --module(mysql_postman). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --record(state, { - mysql_pid :: pid(), - table :: binary() -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(Args :: list()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Args) when is_list(Args) -> - gen_server:start_link(?MODULE, [Args], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([Args]) -> - MysqlOpts = proplists:get_value(mysql_opts, Args), - Table = proplists:get_value(table, Args), - - {ok, ConnPid} = mysql:start_link(MysqlOpts), - - {ok, #state{mysql_pid = ConnPid, table = Table}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({post, ReceiverPid, #post_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) -> - case catch mysql_provider:insert(ConnPid, Table, Fields, false) of - ok -> - ReceiverPid ! {ack, Id}; - Error -> - lager:debug("[mysql_postman] insert table: ~p, res is: ~p", [Table, Error]) - end, - {noreply, State}; - -handle_info(stop, State = #state{mysql_pid = ConnPid}) -> - mysql:stop(ConnPid), - {stop, normal, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%===================================================================