From cbc659187ea02cdd2f844cc1a2b55370760d53c6 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 22 Dec 2023 16:28:28 +0800 Subject: [PATCH] fix directive --- apps/iot/include/iot.hrl | 5 +- apps/iot/src/iot_host.erl | 73 ++++++++++++++++++++++++++- apps/iot/src/websocket/ws_channel.erl | 53 +++++++++---------- docs/websocket.md | 4 +- 4 files changed, 104 insertions(+), 31 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index f1dc1b7..4d6264e 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -45,8 +45,9 @@ -define(PACKET_PUBLISH_RESPONSE, 16#04). %% 服务端指令 --define(PACKET_COMMAND, 16#05). --define(PACKET_COMMAND_RESPONSE, 16#06). +% directive +-define(PACKET_DIRECTIVE, 16#05). +-define(PACKET_DIRECTIVE_RESPONSE, 16#06). %% 事件类型 -define(EVENT_DEVICE, 16#01). diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 2f4f3a4..0764b96 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -21,7 +21,7 @@ %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). --export([get_metric/1, publish_message/4, get_aes/1, get_status/1]). +-export([get_metric/1, publish_message/4, get_aes/1, get_status/1, publish_directive/6, send_directive/5]). -export([create_session/2, attach_channel/2]). -export([reload_device/2, delete_device/2, activate_device/3]). -export([heartbeat/1]). @@ -109,6 +109,45 @@ publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer( {error, Reason} end. +-spec publish_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map(), Timeout :: integer()) -> + ok | {ok, Response :: binary()} | {error, Reason :: any()}. +publish_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams, Timeout) + when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams), is_integer(Timeout) -> + + Directive = #{ + <<"device_uuid">> => DeviceUUID, + <<"version">> => Version, + <<"directive_type">> => DirectiveType, + <<"directive">> => DirectiveParams + }, + + case gen_statem:call(Pid, {publish_directive, self(), Directive}) of + {ok, Ref} -> + receive + {directive_response, Ref} -> + ok; + {directive_response, Ref, Response} -> + {ok, Response} + after Timeout -> + {error, timeout} + end; + {error, Reason} -> + {error, Reason} + end. + +-spec send_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map()) -> + ok | {error, Reason :: any()}. +send_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams) + when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams) -> + + Directive = #{ + <<"device_uuid">> => DeviceUUID, + <<"version">> => Version, + <<"directive_type">> => DirectiveType, + <<"directive">> => DirectiveParams + }, + gen_statem:call(Pid, {send_directive, Directive}). + %% 设备管理相关 -spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}. @@ -232,6 +271,38 @@ handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid = lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]), {keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]}; +%% 发送指令时, 指令要通过aes加密,必须要求session是存在的 +handle_event({call, From}, {publish_directive, ReceiverPid, Directive0}, ?STATE_ACTIVATED, + State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> + + lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), + Directive = iot_cipher_aes:encrypt(AES, Directive0), + %% 通过websocket发送请求 + Ref = ws_channel:publish_directive(ChannelPid, ReceiverPid, Directive), + + {keep_state, State, [{reply, From, {ok, Ref}}]}; + +%% 其他情况下,发送指令是失败的 +handle_event({call, From}, {publish_directive, _, Directive}, _, State = #state{uuid = UUID}) -> + lager:debug("[iot_host] uuid: ~p, publish_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]), + {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; + +%% 发送指令时, 指令要通过aes加密,必须要求session是存在的 +handle_event({call, From}, {send_directive, Directive0}, ?STATE_ACTIVATED, + State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> + + lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), + Directive = iot_cipher_aes:encrypt(AES, Directive0), + %% 通过websocket发送请求 + ws_channel:send_directive(ChannelPid, Directive), + + {keep_state, State, [{reply, From, ok}]}; + +%% 其他情况下,发送指令是失败的 +handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid = UUID}) -> + lager:debug("[iot_host] uuid: ~p, send_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]), + {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; + %% 激活主机 handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]), diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index 7a0d326..bdf5dad 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -14,7 +14,7 @@ -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). -export([publish/3, stop/2, send/2]). --export([publish_command/3, send_command/2]). +-export([publish_directive/3, send_directive/2]). -record(state, { uuid :: undefined | binary(), @@ -26,7 +26,7 @@ %% 请求响应的对应关系 inflight = #{}, %% 指令的对应关系 - cmd_inflight = #{} + directive_inflight = #{} }). %% 向通道中写入消息 @@ -42,18 +42,15 @@ send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> Pid ! {send, Msg}. %% 发送指令信息 - -%% 向通道中写入消息 --spec publish_command(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference(). -publish_command(Pid, ReceiverPid, Cmd) when is_pid(Pid), is_binary(Cmd) -> +-spec publish_directive(Pid :: pid(), ReceiverPid :: pid(), Directive :: binary()) -> Ref :: reference(). +publish_directive(Pid, ReceiverPid, Directive) when is_pid(Pid), is_binary(Directive) -> Ref = make_ref(), - Pid ! {publish_command, ReceiverPid, Ref, Cmd}, + Pid ! {publish_directive, ReceiverPid, Ref, Directive}, Ref. -%% 向通道中写入消息 --spec send_command(Pid :: pid(), Cmd :: binary()) -> no_return(). -send_command(Pid, Cmd) when is_pid(Pid), is_binary(Cmd) -> - Pid ! {send_command, Cmd}. +-spec send_directive(Pid :: pid(), Directive :: binary()) -> no_return(). +send_directive(Pid, Directive) when is_pid(Pid), is_binary(Directive) -> + Pid ! {send_directive, Directive}. %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). @@ -157,23 +154,27 @@ websocket_handle({binary, <> {ok, State#state{inflight = NInflight}} end; -%% 命令的响应 -websocket_handle({binary, <>}, State = #state{uuid = UUID, cmd_inflight = CmdInflight}) when PacketId > 0 -> - lager:debug("[ws_channel] uuid: ~p, get command response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), - case maps:take(PacketId, CmdInflight) of +%% 命令的响应, PacketId == 0 的指令就不应该有返回值 +websocket_handle({binary, <>}, State = #state{uuid = UUID}) -> + lager:debug("[ws_channel] uuid: ~p, get send directive response message: ~p", [UUID, Body]), + {ok, State}; + +websocket_handle({binary, <>}, State = #state{uuid = UUID, directive_inflight = DirectiveInflight}) when PacketId > 0 -> + lager:debug("[ws_channel] uuid: ~p, get directive response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), + case maps:take(PacketId, DirectiveInflight) of error -> - lager:warning("[ws_channel] get unknown command response message: ~p, packet_id: ~p", [Body, PacketId]), + lager:warning("[ws_channel] get unknown directive response message: ~p, packet_id: ~p", [Body, PacketId]), {ok, State}; - {{ReceiverPid, Ref}, NCmdInflight} -> + {{ReceiverPid, Ref}, NDirectiveInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of true when Body == <<>> -> - ReceiverPid ! {cmd_response, Ref}; + ReceiverPid ! {directive_response, Ref}; true -> - ReceiverPid ! {cmd_response, Ref, Body}; + ReceiverPid ! {directive_response, Ref, Body}; false -> - lager:warning("[ws_channel] get command response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) + lager:warning("[ws_channel] get directive response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) end, - {ok, State#state{cmd_inflight = NCmdInflight}} + {ok, State#state{directive_inflight = NDirectiveInflight}} end; websocket_handle(Info, State) -> @@ -195,13 +196,13 @@ websocket_info({send, Msg}, State) when is_binary(Msg) -> {reply, {binary, <>}, State}; %% 下发指令 -websocket_info({publish_command, ReceiverPid, Ref, Cmd}, State = #state{packet_id = PacketId, cmd_inflight = CmdInflight}) when is_binary(Cmd) -> - NCmdInflight = maps:put(PacketId, {ReceiverPid, Ref}, CmdInflight), - {reply, {binary, <>}, State#state{packet_id = PacketId + 1, cmd_inflight = NCmdInflight}}; +websocket_info({publish_directive, ReceiverPid, Ref, Directive}, State = #state{packet_id = PacketId, directive_inflight = DirectiveInflight}) when is_binary(Directive) -> + NDirectiveInflight = maps:put(PacketId, {ReceiverPid, Ref}, DirectiveInflight), + {reply, {binary, <>}, State#state{packet_id = PacketId + 1, directive_inflight = NDirectiveInflight}}; %% 发送指令, 不需要等待回复 -websocket_info({send_command, Cmd}, State) when is_binary(Cmd) -> - {reply, {binary, <>}, State}; +websocket_info({send_directive, Directive}, State) when is_binary(Directive) -> + {reply, {binary, <>}, State}; %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> diff --git a/docs/websocket.md b/docs/websocket.md index 5ae07ef..37eae6c 100644 --- a/docs/websocket.md +++ b/docs/websocket.md @@ -246,8 +246,8 @@ Body: 事件内容,AES加密 { "device_uuid": "xxxxxx", // 设备的device_uuid, 数组格式 "version": "1.0", - "command_type": 0x01, // 中电计费电表控制 - "command": { + "directive_type": 0x01, // 中电计费电表控制 + "directive": { "type": "ctrl", // 遥控 "stype": int, // 遥控类型,0: 遥控, 1: 遥调, 2: 置数 "ctype": int, // 遥控动作, 0: 打开,1: 闭合