From d1e69a31f04dd4c7250fa6b534206a7a6890244d Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 21 Apr 2025 10:46:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0channel=E7=9A=84=E8=BF=87?= =?UTF-8?q?=E6=9C=9F=E6=B6=88=E6=81=AF=E6=B8=85=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/iot_host.erl | 12 +++---- apps/iot/src/websocket/ws_channel.erl | 45 +++++++++++++++++++++++---- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 0b83338..26c26a1 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -97,7 +97,7 @@ create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) -> -spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) -> ok | {ok, Response :: binary()} | {error, Reason :: any()}. publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) -> - case gen_statem:call(Pid, {publish_message, self(), CommandType, Params}) of + case gen_statem:call(Pid, {publish_message, self(), CommandType, Params, Timeout}) of {ok, Ref} -> receive {ws_response, Ref} -> @@ -248,27 +248,27 @@ handle_event({call, From}, get_status, _, State = #state{host_id = HostId, chann {keep_state, State, [{reply, From, {ok, Reply}}]}; %% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的 -handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_ACTIVATED, +handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}, Timeout}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]), Command = iot_cipher_aes:encrypt(AES, Command0), %% 通过websocket发送请求 - Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>), + Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>, Timeout), {keep_state, State, [{reply, From, {ok, Ref}}]}; %% 只要channel存在,就负责将消息推送到边缘端主机 -handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _, +handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command, Timeout}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) -> %% 通过websocket发送请求 lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]), - Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>), + Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>, Timeout), {keep_state, State, [{reply, From, {ok, Ref}}]}; -handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid = UUID}) -> +handle_event({call, From}, {publish_message, _, _, _, _}, _, State = #state{uuid = UUID}) -> lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]), {keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]}; diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index 943ec6c..65c5e3f 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -13,7 +13,10 @@ %% API -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). --export([publish/3, stop/2, send/2]). +-export([publish/3, publish/4, stop/2, send/2]). + +%% 每隔20秒清理一次 +-define(CLEAN_INTERVAL, 20000). -record(state, { uuid :: undefined | binary(), @@ -26,11 +29,15 @@ inflight = #{} }). -%% 向通道中写入消息 +%% 向通道中写入消息, timeout的时间为毫秒 -spec publish(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference(). publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) -> + publish(Pid, ReceiverPid, Msg, 0). + +-spec publish(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary(), Timeout :: integer()) -> Ref :: reference(). +publish(Pid, ReceiverPid, Msg, Timeout) when is_pid(Pid), is_binary(Msg), is_integer(Timeout) -> Ref = make_ref(), - Pid ! {publish, ReceiverPid, Ref, Msg}, + Pid ! {publish, ReceiverPid, Ref, Msg, Timeout}, Ref. %% 向通道中写入消息 @@ -54,6 +61,7 @@ init(Req, Opts) -> websocket_init(_State) -> lager:debug("[ws_channel] get a new connection"), + clean_ticker(), %% 初始状态为true {ok, #state{packet_id = 1}}. @@ -150,14 +158,36 @@ websocket_info({stop, Reason}, State) -> {stop, State}; %% 发送消息 -websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = PacketId, inflight = Inflight}) when is_binary(Msg) -> - NInflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight), +websocket_info({publish, ReceiverPid, Ref, Msg, Timeout}, State = #state{packet_id = PacketId, inflight = Inflight}) when is_binary(Msg) -> + NInflight = case Timeout > 0 of + true -> + TTL = iot_util:timestamp() + Timeout, + maps:put(PacketId, {ReceiverPid, Ref, TTL}, Inflight); + false -> + maps:put(PacketId, {ReceiverPid, Ref}, Inflight) + end, {reply, {binary, <>}, State#state{packet_id = PacketId + 1, inflight = NInflight}}; %% 发送消息, 不需要等待回复 websocket_info({send, Msg}, State) when is_binary(Msg) -> {reply, {binary, <>}, State}; +%% 定期清理消息 +websocket_info({timeout, _, clean_ticker}, State=#state{inflight = Inflight}) -> + clean_ticker(), + + Timestamp = iot_util:timestamp(), + NInflight = maps:filter(fun(_, Promise) -> + case Promise of + {_ReceiverPid, _Ref, TTL} -> + TTL < Timestamp; + {_ReceiverPid, _Ref} -> + true + end + end, Inflight), + + {ok, State#state{inflight = NInflight}}; + %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]), @@ -171,4 +201,7 @@ websocket_info(Info, State = #state{uuid = UUID}) -> %% 进程关闭事件 terminate(Reason, _Req, State) -> lager:debug("[ws_channel] channel close with reason: ~p, state is: ~p", [Reason, State]), - ok. \ No newline at end of file + ok. + +clean_ticker() -> + erlang:start_timer(?CLEAN_INTERVAL, self(), clean_ticker).