fix
This commit is contained in:
parent
bda668eeed
commit
a4c8a0554a
@ -21,17 +21,14 @@
|
|||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% 消息的qos等级
|
|
||||||
-define(QOS, 1).
|
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
host :: #host{},
|
host :: #host{},
|
||||||
emqx_pid :: pid()
|
emqx_pid :: pid()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
test(Id) when is_binary(Id) ->
|
test(Msg) when is_binary(Msg) ->
|
||||||
Pid = get_pid(<<"1">>),
|
Pid = get_pid(<<"1">>),
|
||||||
publish(Pid, <<"hello world">>).
|
publish(Pid, Msg).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% API
|
%%% API
|
||||||
@ -110,7 +107,7 @@ init([Host = #host{host_id = HostId}]) ->
|
|||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_call({publish, Message}, _From, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) ->
|
handle_call({publish, Message}, _From, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) ->
|
||||||
Topic = <<"/host/", HostId/binary, "/downstream">>,
|
Topic = <<"/host/", HostId/binary, "/downstream">>,
|
||||||
Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, ?QOS}]),
|
Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 2}]),
|
||||||
{reply, Result, State}.
|
{reply, Result, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -133,7 +130,7 @@ handle_info({timeout, _, subscribe_ticker}, State = #state{host = #host{host_id
|
|||||||
%% 监听和host相关的全部事件
|
%% 监听和host相关的全部事件
|
||||||
{ok, _} = emqtt:connect(ConnPid),
|
{ok, _} = emqtt:connect(ConnPid),
|
||||||
Topics = [
|
Topics = [
|
||||||
{<<"/host/", HostId/binary, "/upstream">>, ?QOS}
|
{<<"/host/", HostId/binary, "/upstream">>, 1}
|
||||||
],
|
],
|
||||||
SubscribeResult = emqtt:subscribe(ConnPid, Topics),
|
SubscribeResult = emqtt:subscribe(ConnPid, Topics),
|
||||||
lager:debug("[iot_host] host_id: ~p, subscribe result is: ~p", [HostId, SubscribeResult]),
|
lager:debug("[iot_host] host_id: ~p, subscribe result is: ~p", [HostId, SubscribeResult]),
|
||||||
@ -144,12 +141,13 @@ handle_info({disconnect, ReasonCode, Properties}, State = #state{host = #host{ho
|
|||||||
|
|
||||||
lager:debug("[iot_host] host: ~p, Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [HostId, ReasonCode, Properties]),
|
lager:debug("[iot_host] host: ~p, Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [HostId, ReasonCode, Properties]),
|
||||||
{stop, disconnected, State};
|
{stop, disconnected, State};
|
||||||
handle_info({publish, Message = #{packet_id := PacketId, payload := Payload}}, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) ->
|
handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{emqx_pid = _ConnPid, host = #host{host_id = HostId}}) ->
|
||||||
lager:debug("[iot_host] host: ~p, Recv a publish packet: ~p, payload: ~p", [HostId, Message, Payload]),
|
lager:debug("[iot_host] host: ~p, Recv a publish packet: ~p, payload: ~p", [HostId, Message, Payload]),
|
||||||
% emqtt:pubrec(ConnPid, PacketId),
|
% emqtt:pubrec(ConnPid, PacketId),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_info({puback, #{packet_id := PacketId, reason_code := ReasonCode}}, State = #state{host = #host{host_id = HostId}}) ->
|
handle_info({puback, Msg = #{packet_id := PacketId, reason_code := ReasonCode}}, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) ->
|
||||||
lager:debug("[iot_host] host: ~p, receive puback packet_id: ~p, reason_code: ~p", [HostId, PacketId, ReasonCode]),
|
lager:debug("[iot_host] host: ~p, receive puback packet_id: ~p, reason_code: ~p, msg: ~p", [HostId, PacketId, ReasonCode, Msg]),
|
||||||
|
emqtt:pubrel(ConnPid, PacketId),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(Info, State = #state{host = #host{host_id = HostId}}) ->
|
handle_info(Info, State = #state{host = #host{host_id = HostId}}) ->
|
||||||
|
|||||||
@ -20,9 +20,6 @@
|
|||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% 消息的qos等级
|
|
||||||
-define(QOS, 1).
|
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
host_id :: binary(),
|
host_id :: binary(),
|
||||||
emqx_pid :: pid()
|
emqx_pid :: pid()
|
||||||
@ -73,16 +70,16 @@ init([HostId]) ->
|
|||||||
|
|
||||||
case emqtt:start_link(Opts) of
|
case emqtt:start_link(Opts) of
|
||||||
{ok, ConnPid} ->
|
{ok, ConnPid} ->
|
||||||
lager:debug("[iot_host] connect success, pid: ~p", [ConnPid]),
|
lager:debug("[iot_host_mocker] connect success, pid: ~p", [ConnPid]),
|
||||||
%% 快速启动避免阻塞iot_host_sup的启动
|
%% 快速启动避免阻塞iot_host_mocker_sup的启动
|
||||||
erlang:start_timer(0, self(), subscribe_ticker),
|
erlang:start_timer(0, self(), subscribe_ticker),
|
||||||
|
|
||||||
{ok, #state{host_id = HostId, emqx_pid = ConnPid}};
|
{ok, #state{host_id = HostId, emqx_pid = ConnPid}};
|
||||||
ignore ->
|
ignore ->
|
||||||
lager:debug("[iot_host] connect emqx get ignore"),
|
lager:debug("[iot_host_mocker] connect emqx get ignore"),
|
||||||
{stop, ignore};
|
{stop, ignore};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[iot_host] connect emqx get error: ~p", [Reason]),
|
lager:debug("[iot_host_mocker] connect emqx get error: ~p", [Reason]),
|
||||||
{stop, Reason}
|
{stop, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
@ -98,7 +95,7 @@ init([HostId]) ->
|
|||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_call({publish, Message}, _From, State = #state{emqx_pid = ConnPid, host_id = HostId}) ->
|
handle_call({publish, Message}, _From, State = #state{emqx_pid = ConnPid, host_id = HostId}) ->
|
||||||
Topic = <<"/host/", HostId/binary, "/upstream">>,
|
Topic = <<"/host/", HostId/binary, "/upstream">>,
|
||||||
Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, ?QOS}]),
|
Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 1}]),
|
||||||
{reply, Result, State}.
|
{reply, Result, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -121,23 +118,33 @@ handle_info({timeout, _, subscribe_ticker}, State = #state{emqx_pid = ConnPid, h
|
|||||||
%% 监听和host相关的全部事件
|
%% 监听和host相关的全部事件
|
||||||
{ok, _} = emqtt:connect(ConnPid),
|
{ok, _} = emqtt:connect(ConnPid),
|
||||||
Topics = [
|
Topics = [
|
||||||
{<<"/host/", HostId/binary, "/downstream">>, ?QOS}
|
{<<"/host/", HostId/binary, "/downstream">>, 2}
|
||||||
],
|
],
|
||||||
SubscribeResult = emqtt:subscribe(ConnPid, Topics),
|
SubscribeResult = emqtt:subscribe(ConnPid, Topics),
|
||||||
lager:debug("[iot_host] host_id: ~p, subscribe result is: ~p", [HostId, SubscribeResult]),
|
lager:debug("[iot_host_mocker] host_id: ~p, subscribe result is: ~p", [HostId, SubscribeResult]),
|
||||||
|
|
||||||
{noreply, State#state{emqx_pid = ConnPid}};
|
{noreply, State#state{emqx_pid = ConnPid}};
|
||||||
|
|
||||||
handle_info({disconnect, ReasonCode, Properties}, State = #state{host_id = HostId}) ->
|
handle_info({disconnect, ReasonCode, Properties}, State = #state{host_id = HostId}) ->
|
||||||
|
|
||||||
lager:debug("[iot_host] host: ~p, Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [HostId, ReasonCode, Properties]),
|
lager:debug("[iot_host_mocker] host: ~p, Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [HostId, ReasonCode, Properties]),
|
||||||
{stop, disconnected, State};
|
{stop, disconnected, State};
|
||||||
handle_info({publish, Message = #{packet_id := PacketId, payload := Payload}}, State = #state{emqx_pid = ConnPid, host_id = HostId}) ->
|
%% 收到qos为2的包
|
||||||
lager:debug("[iot_host] host: ~p, Recv a publish packet: ~p, payload: ~p", [HostId, Message, Payload]),
|
handle_info({publish, Message = #{packet_id := PacketId, payload := Payload, qos := 2}}, State = #state{emqx_pid = ConnPid, host_id = HostId}) ->
|
||||||
% emqtt:pubrec(ConnPid, PacketId),
|
lager:debug("[iot_host_mocker] host: ~p, qos: 2, Recv a publish packet: ~p, payload: ~p", [HostId, Message, Payload]),
|
||||||
|
%% 回复收到的请求信息
|
||||||
|
emqtt:pubrec(ConnPid, PacketId),
|
||||||
|
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info({publish, Message = #{packet_id := PacketId, payload := Payload}}, State = #state{emqx_pid = _ConnPid, host_id = HostId}) ->
|
||||||
|
lager:debug("[iot_host_mocker] host: ~p, Recv a publish packet: ~p, payload: ~p, packet_id: ~p", [HostId, Message, Payload, PacketId]),
|
||||||
|
%% 回复收到的请求信息
|
||||||
|
% emqtt:puback(ConnPid, PacketId),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_info({puback, #{packet_id := PacketId, reason_code := ReasonCode}}, State = #state{host_id = HostId}) ->
|
handle_info({puback, #{packet_id := PacketId, reason_code := ReasonCode}}, State = #state{host_id = HostId}) ->
|
||||||
lager:debug("[iot_host] host: ~p, receive puback packet_id: ~p, reason_code: ~p", [HostId, PacketId, ReasonCode]),
|
lager:debug("[iot_host_mocker] host: ~p, receive puback packet_id: ~p, reason_code: ~p", [HostId, PacketId, ReasonCode]),
|
||||||
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(Info, State = #state{host_id = HostId}) ->
|
handle_info(Info, State = #state{host_id = HostId}) ->
|
||||||
@ -157,7 +164,7 @@ terminate(_Reason, _State = #state{emqx_pid = ConnPid}) when is_pid(ConnPid) ->
|
|||||||
ok = emqtt:stop(ConnPid),
|
ok = emqtt:stop(ConnPid),
|
||||||
ok;
|
ok;
|
||||||
terminate(Reason, _State) ->
|
terminate(Reason, _State) ->
|
||||||
lager:debug("[iot_host] terminate with reason: ~p", [Reason]),
|
lager:debug("[iot_host_mocker] terminate with reason: ~p", [Reason]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user