增加channel的过期消息清理逻辑
This commit is contained in:
parent
534decb43b
commit
d1e69a31f0
@ -97,7 +97,7 @@ create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) ->
|
|||||||
-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) ->
|
-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) ->
|
||||||
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
|
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
|
||||||
publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) ->
|
publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) ->
|
||||||
case gen_statem:call(Pid, {publish_message, self(), CommandType, Params}) of
|
case gen_statem:call(Pid, {publish_message, self(), CommandType, Params, Timeout}) of
|
||||||
{ok, Ref} ->
|
{ok, Ref} ->
|
||||||
receive
|
receive
|
||||||
{ws_response, Ref} ->
|
{ws_response, Ref} ->
|
||||||
@ -248,27 +248,27 @@ handle_event({call, From}, get_status, _, State = #state{host_id = HostId, chann
|
|||||||
{keep_state, State, [{reply, From, {ok, Reply}}]};
|
{keep_state, State, [{reply, From, {ok, Reply}}]};
|
||||||
|
|
||||||
%% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的
|
%% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的
|
||||||
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_ACTIVATED,
|
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}, Timeout}, ?STATE_ACTIVATED,
|
||||||
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
|
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
|
||||||
|
|
||||||
lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]),
|
lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]),
|
||||||
Command = iot_cipher_aes:encrypt(AES, Command0),
|
Command = iot_cipher_aes:encrypt(AES, Command0),
|
||||||
%% 通过websocket发送请求
|
%% 通过websocket发送请求
|
||||||
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>),
|
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>, Timeout),
|
||||||
|
|
||||||
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||||
|
|
||||||
%% 只要channel存在,就负责将消息推送到边缘端主机
|
%% 只要channel存在,就负责将消息推送到边缘端主机
|
||||||
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _,
|
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command, Timeout}, _,
|
||||||
State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) ->
|
State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) ->
|
||||||
|
|
||||||
%% 通过websocket发送请求
|
%% 通过websocket发送请求
|
||||||
lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]),
|
lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]),
|
||||||
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>),
|
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>, Timeout),
|
||||||
|
|
||||||
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||||
|
|
||||||
handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid = UUID}) ->
|
handle_event({call, From}, {publish_message, _, _, _, _}, _, State = #state{uuid = UUID}) ->
|
||||||
lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]),
|
lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]),
|
||||||
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
|
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
|
||||||
|
|
||||||
|
|||||||
@ -13,7 +13,10 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([init/2]).
|
-export([init/2]).
|
||||||
-export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]).
|
-export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]).
|
||||||
-export([publish/3, stop/2, send/2]).
|
-export([publish/3, publish/4, stop/2, send/2]).
|
||||||
|
|
||||||
|
%% 每隔20秒清理一次
|
||||||
|
-define(CLEAN_INTERVAL, 20000).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
uuid :: undefined | binary(),
|
uuid :: undefined | binary(),
|
||||||
@ -26,11 +29,15 @@
|
|||||||
inflight = #{}
|
inflight = #{}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% 向通道中写入消息
|
%% 向通道中写入消息, timeout的时间为毫秒
|
||||||
-spec publish(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference().
|
-spec publish(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference().
|
||||||
publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) ->
|
publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) ->
|
||||||
|
publish(Pid, ReceiverPid, Msg, 0).
|
||||||
|
|
||||||
|
-spec publish(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary(), Timeout :: integer()) -> Ref :: reference().
|
||||||
|
publish(Pid, ReceiverPid, Msg, Timeout) when is_pid(Pid), is_binary(Msg), is_integer(Timeout) ->
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
Pid ! {publish, ReceiverPid, Ref, Msg},
|
Pid ! {publish, ReceiverPid, Ref, Msg, Timeout},
|
||||||
Ref.
|
Ref.
|
||||||
|
|
||||||
%% 向通道中写入消息
|
%% 向通道中写入消息
|
||||||
@ -54,6 +61,7 @@ init(Req, Opts) ->
|
|||||||
|
|
||||||
websocket_init(_State) ->
|
websocket_init(_State) ->
|
||||||
lager:debug("[ws_channel] get a new connection"),
|
lager:debug("[ws_channel] get a new connection"),
|
||||||
|
clean_ticker(),
|
||||||
%% 初始状态为true
|
%% 初始状态为true
|
||||||
{ok, #state{packet_id = 1}}.
|
{ok, #state{packet_id = 1}}.
|
||||||
|
|
||||||
@ -150,14 +158,36 @@ websocket_info({stop, Reason}, State) ->
|
|||||||
{stop, State};
|
{stop, State};
|
||||||
|
|
||||||
%% 发送消息
|
%% 发送消息
|
||||||
websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = PacketId, inflight = Inflight}) when is_binary(Msg) ->
|
websocket_info({publish, ReceiverPid, Ref, Msg, Timeout}, State = #state{packet_id = PacketId, inflight = Inflight}) when is_binary(Msg) ->
|
||||||
NInflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight),
|
NInflight = case Timeout > 0 of
|
||||||
|
true ->
|
||||||
|
TTL = iot_util:timestamp() + Timeout,
|
||||||
|
maps:put(PacketId, {ReceiverPid, Ref, TTL}, Inflight);
|
||||||
|
false ->
|
||||||
|
maps:put(PacketId, {ReceiverPid, Ref}, Inflight)
|
||||||
|
end,
|
||||||
{reply, {binary, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State#state{packet_id = PacketId + 1, inflight = NInflight}};
|
{reply, {binary, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State#state{packet_id = PacketId + 1, inflight = NInflight}};
|
||||||
|
|
||||||
%% 发送消息, 不需要等待回复
|
%% 发送消息, 不需要等待回复
|
||||||
websocket_info({send, Msg}, State) when is_binary(Msg) ->
|
websocket_info({send, Msg}, State) when is_binary(Msg) ->
|
||||||
{reply, {binary, <<?PACKET_PUBLISH, 0:32, Msg/binary>>}, State};
|
{reply, {binary, <<?PACKET_PUBLISH, 0:32, Msg/binary>>}, State};
|
||||||
|
|
||||||
|
%% 定期清理消息
|
||||||
|
websocket_info({timeout, _, clean_ticker}, State=#state{inflight = Inflight}) ->
|
||||||
|
clean_ticker(),
|
||||||
|
|
||||||
|
Timestamp = iot_util:timestamp(),
|
||||||
|
NInflight = maps:filter(fun(_, Promise) ->
|
||||||
|
case Promise of
|
||||||
|
{_ReceiverPid, _Ref, TTL} ->
|
||||||
|
TTL < Timestamp;
|
||||||
|
{_ReceiverPid, _Ref} ->
|
||||||
|
true
|
||||||
|
end
|
||||||
|
end, Inflight),
|
||||||
|
|
||||||
|
{ok, State#state{inflight = NInflight}};
|
||||||
|
|
||||||
%% 用户进程关闭,则关闭通道
|
%% 用户进程关闭,则关闭通道
|
||||||
websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) ->
|
websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) ->
|
||||||
lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]),
|
lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]),
|
||||||
@ -172,3 +202,6 @@ websocket_info(Info, State = #state{uuid = UUID}) ->
|
|||||||
terminate(Reason, _Req, State) ->
|
terminate(Reason, _Req, State) ->
|
||||||
lager:debug("[ws_channel] channel close with reason: ~p, state is: ~p", [Reason, State]),
|
lager:debug("[ws_channel] channel close with reason: ~p, state is: ~p", [Reason, State]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
clean_ticker() ->
|
||||||
|
erlang:start_timer(?CLEAN_INTERVAL, self(), clean_ticker).
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user