From 5f28301ed187a34548a179199508c8a776ada457 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 22 Dec 2023 15:19:21 +0800 Subject: [PATCH] fix ws command --- apps/iot/src/websocket/ws_channel.erl | 47 ++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index 943ec6c..7a0d326 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -14,6 +14,7 @@ -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). -export([publish/3, stop/2, send/2]). +-export([publish_command/3, send_command/2]). -record(state, { uuid :: undefined | binary(), @@ -23,7 +24,9 @@ packet_id = 1 :: integer(), %% 请求响应的对应关系 - inflight = #{} + inflight = #{}, + %% 指令的对应关系 + cmd_inflight = #{} }). %% 向通道中写入消息 @@ -38,6 +41,20 @@ publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) -> send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> Pid ! {send, Msg}. +%% 发送指令信息 + +%% 向通道中写入消息 +-spec publish_command(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference(). +publish_command(Pid, ReceiverPid, Cmd) when is_pid(Pid), is_binary(Cmd) -> + Ref = make_ref(), + Pid ! {publish_command, ReceiverPid, Ref, Cmd}, + Ref. + +%% 向通道中写入消息 +-spec send_command(Pid :: pid(), Cmd :: binary()) -> no_return(). +send_command(Pid, Cmd) when is_pid(Pid), is_binary(Cmd) -> + Pid ! {send_command, Cmd}. + %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). stop(undefined, _Reason) -> @@ -140,6 +157,25 @@ websocket_handle({binary, <> {ok, State#state{inflight = NInflight}} end; +%% 命令的响应 +websocket_handle({binary, <>}, State = #state{uuid = UUID, cmd_inflight = CmdInflight}) when PacketId > 0 -> + lager:debug("[ws_channel] uuid: ~p, get command response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), + case maps:take(PacketId, CmdInflight) of + error -> + lager:warning("[ws_channel] get unknown command response message: ~p, packet_id: ~p", [Body, PacketId]), + {ok, State}; + {{ReceiverPid, Ref}, NCmdInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true when Body == <<>> -> + ReceiverPid ! {cmd_response, Ref}; + true -> + ReceiverPid ! {cmd_response, Ref, Body}; + false -> + lager:warning("[ws_channel] get command response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) + end, + {ok, State#state{cmd_inflight = NCmdInflight}} + end; + websocket_handle(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}. @@ -158,6 +194,15 @@ websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = Pack websocket_info({send, Msg}, State) when is_binary(Msg) -> {reply, {binary, <>}, State}; +%% 下发指令 +websocket_info({publish_command, ReceiverPid, Ref, Cmd}, State = #state{packet_id = PacketId, cmd_inflight = CmdInflight}) when is_binary(Cmd) -> + NCmdInflight = maps:put(PacketId, {ReceiverPid, Ref}, CmdInflight), + {reply, {binary, <>}, State#state{packet_id = PacketId + 1, cmd_inflight = NCmdInflight}}; + +%% 发送指令, 不需要等待回复 +websocket_info({send_command, Cmd}, State) when is_binary(Cmd) -> + {reply, {binary, <>}, State}; + %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]),