This commit is contained in:
安礼成 2023-04-17 17:28:22 +08:00
parent 2020cafc69
commit 9b2f75f9d1
3 changed files with 14 additions and 12 deletions

View File

@ -29,7 +29,7 @@ emqt_opts() ->
{username, Username}, {username, Username},
{password, Password}, {password, Password},
{keepalive, Keepalive}, {keepalive, Keepalive},
{auto_ack, false}, {auto_ack, true},
{proto_ver, v5}, {proto_ver, v5},
{retry_interval, RetryInterval} {retry_interval, RetryInterval}
]. ].

View File

@ -66,6 +66,7 @@ init([]) ->
case emqtt:start_link(Opts) of case emqtt:start_link(Opts) of
{ok, ConnPid} -> {ok, ConnPid} ->
lager:debug("[iot_mqtt_publisher] connect success, pid: ~p", [ConnPid]), lager:debug("[iot_mqtt_publisher] connect success, pid: ~p", [ConnPid]),
{ok, _} = emqtt:connect(ConnPid),
{ok, #state{conn_pid = ConnPid}}; {ok, #state{conn_pid = ConnPid}};
ignore -> ignore ->
lager:debug("[iot_mqtt_publisher] connect emqx get ignore"), lager:debug("[iot_mqtt_publisher] connect emqx get ignore"),
@ -86,6 +87,7 @@ init([]) ->
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_call({publish, ReceiverPid, Topic, Message, Qos}, _From, State = #state{conn_pid = ConnPid, inflight = InFlight}) -> handle_call({publish, ReceiverPid, Topic, Message, Qos}, _From, State = #state{conn_pid = ConnPid, inflight = InFlight}) ->
lager:debug("call me publish"),
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
{ok, PacketId} -> {ok, PacketId} ->
Ref = make_ref(), Ref = make_ref(),

View File

@ -75,16 +75,16 @@ init([]) ->
Opts = iot_config:emqt_opts(), Opts = iot_config:emqt_opts(),
case emqtt:start_link(Opts) of case emqtt:start_link(Opts) of
{ok, ConnPid} -> {ok, ConnPid} ->
lager:debug("[iot_host] connect success, pid: ~p", [ConnPid]), lager:debug("[iot_mqtt_subscriber] connect success, pid: ~p", [ConnPid]),
%% iot_host_sup的启动 %% iot_host_sup的启动
erlang:start_timer(0, self(), subscribe_ticker), erlang:start_timer(0, self(), subscribe_ticker),
{ok, #state{conn_pid = ConnPid}}; {ok, #state{conn_pid = ConnPid}};
ignore -> ignore ->
lager:debug("[iot_host] connect emqx get ignore"), lager:debug("[iot_mqtt_subscriber] connect emqx get ignore"),
{stop, ignore}; {stop, ignore};
{error, Reason} -> {error, Reason} ->
lager:debug("[iot_host] connect emqx get error: ~p", [Reason]), lager:debug("[iot_mqtt_subscriber] connect emqx get error: ~p", [Reason]),
{stop, Reason} {stop, Reason}
end. end.
@ -121,25 +121,25 @@ handle_info({timeout, _, subscribe_ticker}, State = #state{conn_pid = ConnPid})
%% host相关的全部事件 %% host相关的全部事件
{ok, _} = emqtt:connect(ConnPid), {ok, _} = emqtt:connect(ConnPid),
Topics = [ Topics = [
{<<"$share/server/register">>, 1} {<<"$share/nodes//server/register">>, 1}
], ],
SubscribeResult = emqtt:subscribe(ConnPid, Topics), SubscribeResult = emqtt:subscribe(ConnPid, Topics),
lager:debug("[iot_mqtt_share_subscriber] subscribe result is: ~p", [SubscribeResult]), lager:debug("[iot_mqtt_subscriber] subscribe result is: ~p", [SubscribeResult]),
{noreply, State#state{conn_pid = ConnPid}}; {noreply, State#state{conn_pid = ConnPid}};
handle_info({disconnect, ReasonCode, Properties}, State = #state{}) -> handle_info({disconnect, ReasonCode, Properties}, State = #state{}) ->
lager:debug("[iot_mqtt_share_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), lager:debug("[iot_mqtt_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
{stop, disconnected, State}; {stop, disconnected, State};
handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{conn_pid = _ConnPid}) -> handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{conn_pid = _ConnPid}) ->
lager:debug("[iot_mqtt_share_subscriber] Recv a publish packet: ~p, payload: ~p", [Message, Payload]), lager:debug("[iot_mqtt_subscriber] Recv a publish packet: ~p, payload: ~p", [Message, Payload]),
{noreply, State}; {noreply, State};
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
lager:debug("[iot_mqtt_share_subscriber] receive puback packet: ~p", [Packet]), lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]),
{noreply, State}; {noreply, State};
handle_info(Info, State = #state{}) -> handle_info(Info, State = #state{}) ->
lager:debug("[iot_mqtt_share_subscriber] get info: ~p", [Info]), lager:debug("[iot_mqtt_subscriber] get info: ~p", [Info]),
{noreply, State}. {noreply, State}.
%% @private %% @private
@ -153,10 +153,10 @@ terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, <<"hello">>), {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, <<"hello">>),
ok = emqtt:disconnect(ConnPid), ok = emqtt:disconnect(ConnPid),
ok = emqtt:stop(ConnPid), ok = emqtt:stop(ConnPid),
lager:debug("[iot_host] terminate with reason: ~p", [Reason]), lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]),
ok; ok;
terminate(Reason, _State) -> terminate(Reason, _State) ->
lager:debug("[iot_host] terminate with reason: ~p", [Reason]), lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]),
ok. ok.
%% @private %% @private