This commit is contained in:
安礼成 2023-03-06 16:05:34 +08:00
parent 9e32c0b196
commit f593aa00de
5 changed files with 141 additions and 65 deletions

View File

@ -0,0 +1,119 @@
%%%-------------------------------------------------------------------
%%% @author licheng5
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 3 2023 15:42
%%%-------------------------------------------------------------------
-module(iot_emqtt_client).
-author("licheng5").
-behaviour(gen_server).
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([publish/3]).
-define(SERVER, ?MODULE).
-record(state, {
conn_pid :: pid()
}).
%%%===================================================================
%%% API
%%%===================================================================
publish(Topic, Message, Qos) when is_binary(Topic), is_binary(Message), is_integer(Qos), Qos == 0; Qos == 1 ->
gen_server:call(?SERVER, {publish, Topic, Message, Qos}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, ServerOpts} = application:get_env(iot, emqx_server),
{ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]),
{ok, _Props} = emqtt:connect(ConnPid),
SubOpts = [{qos, 1}],
{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, #{}, [{<<"hello">>, SubOpts}]),
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({publish, Topic, Message, Qos}, _From, State = #state{conn_pid = ConnPid}) ->
if
Qos == 0 ->
ok = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 0}]);
Qos == 1 ->
{ok, _PktId} = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 1}]);
true ->
ok
end,
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -35,6 +35,15 @@ init([]) ->
shutdown => 2000,
type => supervisor,
modules => ['iot_router_sup']
},
#{
id => 'iot_emqtt_client',
start => {'iot_emqtt_client', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['iot_emqtt_client']
}
],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -5,7 +5,19 @@
{acceptors, 500},
{max_connections, 10240},
{backlog, 10240}
]},
%% 目标服务器地址
{emqx_server, [
{host, ""},
{port, 18080},
{tcp_opts, []},
{username, ""},
{password, ""},
{keepalive, 'Keepalive'},
{retry_interval, 5}
]}
]},
%% 系统日志配置系统日志为lager, 支持日志按日期自动分割

View File

@ -6,6 +6,7 @@
{cowboy, ".*", {git, "https://github.com/ninenines/cowboy.git", {tag, "2.5.0"}}},
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}},
{parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}},
{emqtt, ".*", {git, "https://github.com/emqx/emqtt", {tag, "v1.2.0"}}},
{lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}}
]}.

View File

@ -1,65 +0,0 @@
{"1.2.0",
[{<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.2">>},1},
{<<"cowboy">>,
{git,"https://github.com/ninenines/cowboy.git",
{ref,"c998673eb009da2ea4dc0e6ef0332534cf679cc4"}},
0},
{<<"cowlib">>,
{git,"https://github.com/ninenines/cowlib",
{ref,"106ba84bb04537879d8ce59321a04e0682110b91"}},
1},
{<<"fs">>,{pkg,<<"fs">>,<<"6.1.1">>},1},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
{<<"hackney">>,
{git,"https://github.com/benoitc/hackney.git",
{ref,"f3e9292db22c807e73f57a8422402d6b423ddf5f"}},
0},
{<<"idna">>,{pkg,<<"idna">>,<<"6.0.1">>},1},
{<<"jiffy">>,
{git,"https://github.com/davisp/jiffy.git",
{ref,"9ea1b35b6e60ba21dfd4adbd18e7916a831fd7d4"}},
0},
{<<"lager">>,
{git,"https://github.com/erlang-lager/lager.git",
{ref,"459a3b2cdd9eadd29e5a7ce5c43932f5ccd6eb88"}},
0},
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},1},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},1},
{<<"parse_trans">>,
{git,"https://github.com/uwiger/parse_trans",
{ref,"6f3645afb43c7c57d61b54ef59aecab288ce1013"}},
0},
{<<"poolboy">>,
{git,"https://github.com/devinus/poolboy.git",
{ref,"3bb48a893ff5598f7c73731ac17545206d259fac"}},
0},
{<<"ranch">>,
{git,"https://github.com/ninenines/ranch",
{ref,"9b8ed47d789412b0021bfc1f94f1c17c387c721c"}},
1},
{<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},1},
{<<"sync">>,
{git,"https://github.com/rustyio/sync.git",
{ref,"3f0049e809ffe303ae2cd395217a025ce6e758ae"}},
0},
{<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.5.0">>},2}]}.
[
{pkg_hash,[
{<<"certifi">>, <<"B7CFEAE9D2ED395695DD8201C57A2D019C0C43ECAF8B8BCB9320B40D6662F340">>},
{<<"fs">>, <<"9D147B944D60CFA48A349F12D06C8EE71128F610C90870BDF9A6773206452ED0">>},
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"idna">>, <<"1D038FB2E7668CE41FBF681D2C45902E52B3CB9E9C77B55334353B222C2EE50C">>},
{<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>},
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>},
{<<"ssl_verify_fun">>, <<"CF344F5692C82D2CD7554F5EC8FD961548D4FD09E7D22F5B62482E5AEAEBD4B0">>},
{<<"unicode_util_compat">>, <<"8516502659002CEC19E244EBD90D312183064BE95025A319A6C7E89F4BCCD65B">>}]},
{pkg_hash_ext,[
{<<"certifi">>, <<"3B3B5F36493004AC3455966991EAF6E768CE9884693D9968055AEEEB1E575040">>},
{<<"fs">>, <<"EF94E95FFE79916860649FED80AC62B04C322B0BB70F5128144C026B4D171F8B">>},
{<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>},
{<<"idna">>, <<"A02C8A1C4FD601215BB0B0324C8A6986749F807CE35F25449EC9E69758708122">>},
{<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>},
{<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>},
{<<"ssl_verify_fun">>, <<"BDB0D2471F453C88FF3908E7686F86F9BE327D065CC1EC16FA4540197EA04680">>},
{<<"unicode_util_compat">>, <<"D48D002E15F5CC105A696CF2F1BBB3FC72B4B770A184D8420C8DB20DA2674B38">>}]}
].