From f593aa00de022b75fe35ce4e345f740f210c950c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Mon, 6 Mar 2023 16:05:34 +0800 Subject: [PATCH] fix --- apps/iot/src/iot_emqtt_client.erl | 119 ++++++++++++++++++++++++++++++ apps/iot/src/iot_sup.erl | 9 +++ config/sys.config | 12 +++ rebar.config | 1 + rebar.lock | 65 ---------------- 5 files changed, 141 insertions(+), 65 deletions(-) create mode 100644 apps/iot/src/iot_emqtt_client.erl delete mode 100644 rebar.lock diff --git a/apps/iot/src/iot_emqtt_client.erl b/apps/iot/src/iot_emqtt_client.erl new file mode 100644 index 0000000..30667bf --- /dev/null +++ b/apps/iot/src/iot_emqtt_client.erl @@ -0,0 +1,119 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 3月 2023 15:42 +%%%------------------------------------------------------------------- +-module(iot_emqtt_client). +-author("licheng5"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-export([publish/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + conn_pid :: pid() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +publish(Topic, Message, Qos) when is_binary(Topic), is_binary(Message), is_integer(Qos), Qos == 0; Qos == 1 -> + gen_server:call(?SERVER, {publish, Topic, Message, Qos}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, ServerOpts} = application:get_env(iot, emqx_server), + {ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]), + {ok, _Props} = emqtt:connect(ConnPid), + + SubOpts = [{qos, 1}], + {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, #{}, [{<<"hello">>, SubOpts}]), + + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({publish, Topic, Message, Qos}, _From, State = #state{conn_pid = ConnPid}) -> + if + Qos == 0 -> + ok = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 0}]); + Qos == 1 -> + {ok, _PktId} = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 1}]); + true -> + ok + end, + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 3cf175d..924e656 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -35,6 +35,15 @@ init([]) -> shutdown => 2000, type => supervisor, modules => ['iot_router_sup'] + }, + + #{ + id => 'iot_emqtt_client', + start => {'iot_emqtt_client', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_emqtt_client'] } ], {ok, {SupFlags, ChildSpecs}}. diff --git a/config/sys.config b/config/sys.config index 51ea41e..53513b0 100644 --- a/config/sys.config +++ b/config/sys.config @@ -5,7 +5,19 @@ {acceptors, 500}, {max_connections, 10240}, {backlog, 10240} + ]}, + + %% 目标服务器地址 + {emqx_server, [ + {host, ""}, + {port, 18080}, + {tcp_opts, []}, + {username, ""}, + {password, ""}, + {keepalive, 'Keepalive'}, + {retry_interval, 5} ]} + ]}, %% 系统日志配置,系统日志为lager, 支持日志按日期自动分割 diff --git a/rebar.config b/rebar.config index 089d748..e04e0d7 100644 --- a/rebar.config +++ b/rebar.config @@ -6,6 +6,7 @@ {cowboy, ".*", {git, "https://github.com/ninenines/cowboy.git", {tag, "2.5.0"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, + {emqtt, ".*", {git, "https://github.com/emqx/emqtt", {tag, "v1.2.0"}}}, {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} ]}. diff --git a/rebar.lock b/rebar.lock deleted file mode 100644 index c0d8370..0000000 --- a/rebar.lock +++ /dev/null @@ -1,65 +0,0 @@ -{"1.2.0", -[{<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.2">>},1}, - {<<"cowboy">>, - {git,"https://github.com/ninenines/cowboy.git", - {ref,"c998673eb009da2ea4dc0e6ef0332534cf679cc4"}}, - 0}, - {<<"cowlib">>, - {git,"https://github.com/ninenines/cowlib", - {ref,"106ba84bb04537879d8ce59321a04e0682110b91"}}, - 1}, - {<<"fs">>,{pkg,<<"fs">>,<<"6.1.1">>},1}, - {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, - {<<"hackney">>, - {git,"https://github.com/benoitc/hackney.git", - {ref,"f3e9292db22c807e73f57a8422402d6b423ddf5f"}}, - 0}, - {<<"idna">>,{pkg,<<"idna">>,<<"6.0.1">>},1}, - {<<"jiffy">>, - {git,"https://github.com/davisp/jiffy.git", - {ref,"9ea1b35b6e60ba21dfd4adbd18e7916a831fd7d4"}}, - 0}, - {<<"lager">>, - {git,"https://github.com/erlang-lager/lager.git", - {ref,"459a3b2cdd9eadd29e5a7ce5c43932f5ccd6eb88"}}, - 0}, - {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},1}, - {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},1}, - {<<"parse_trans">>, - {git,"https://github.com/uwiger/parse_trans", - {ref,"6f3645afb43c7c57d61b54ef59aecab288ce1013"}}, - 0}, - {<<"poolboy">>, - {git,"https://github.com/devinus/poolboy.git", - {ref,"3bb48a893ff5598f7c73731ac17545206d259fac"}}, - 0}, - {<<"ranch">>, - {git,"https://github.com/ninenines/ranch", - {ref,"9b8ed47d789412b0021bfc1f94f1c17c387c721c"}}, - 1}, - {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},1}, - {<<"sync">>, - {git,"https://github.com/rustyio/sync.git", - {ref,"3f0049e809ffe303ae2cd395217a025ce6e758ae"}}, - 0}, - {<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.5.0">>},2}]}. -[ -{pkg_hash,[ - {<<"certifi">>, <<"B7CFEAE9D2ED395695DD8201C57A2D019C0C43ECAF8B8BCB9320B40D6662F340">>}, - {<<"fs">>, <<"9D147B944D60CFA48A349F12D06C8EE71128F610C90870BDF9A6773206452ED0">>}, - {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, - {<<"idna">>, <<"1D038FB2E7668CE41FBF681D2C45902E52B3CB9E9C77B55334353B222C2EE50C">>}, - {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, - {<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>}, - {<<"ssl_verify_fun">>, <<"CF344F5692C82D2CD7554F5EC8FD961548D4FD09E7D22F5B62482E5AEAEBD4B0">>}, - {<<"unicode_util_compat">>, <<"8516502659002CEC19E244EBD90D312183064BE95025A319A6C7E89F4BCCD65B">>}]}, -{pkg_hash_ext,[ - {<<"certifi">>, <<"3B3B5F36493004AC3455966991EAF6E768CE9884693D9968055AEEEB1E575040">>}, - {<<"fs">>, <<"EF94E95FFE79916860649FED80AC62B04C322B0BB70F5128144C026B4D171F8B">>}, - {<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>}, - {<<"idna">>, <<"A02C8A1C4FD601215BB0B0324C8A6986749F807CE35F25449EC9E69758708122">>}, - {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>}, - {<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>}, - {<<"ssl_verify_fun">>, <<"BDB0D2471F453C88FF3908E7686F86F9BE327D065CC1EC16FA4540197EA04680">>}, - {<<"unicode_util_compat">>, <<"D48D002E15F5CC105A696CF2F1BBB3FC72B4B770A184D8420C8DB20DA2674B38">>}]} -].