fix directive
This commit is contained in:
parent
5f28301ed1
commit
cbc659187e
@ -45,8 +45,9 @@
|
|||||||
-define(PACKET_PUBLISH_RESPONSE, 16#04).
|
-define(PACKET_PUBLISH_RESPONSE, 16#04).
|
||||||
|
|
||||||
%% 服务端指令
|
%% 服务端指令
|
||||||
-define(PACKET_COMMAND, 16#05).
|
% directive
|
||||||
-define(PACKET_COMMAND_RESPONSE, 16#06).
|
-define(PACKET_DIRECTIVE, 16#05).
|
||||||
|
-define(PACKET_DIRECTIVE_RESPONSE, 16#06).
|
||||||
|
|
||||||
%% 事件类型
|
%% 事件类型
|
||||||
-define(EVENT_DEVICE, 16#01).
|
-define(EVENT_DEVICE, 16#01).
|
||||||
|
|||||||
@ -21,7 +21,7 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
||||||
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]).
|
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1, publish_directive/6, send_directive/5]).
|
||||||
-export([create_session/2, attach_channel/2]).
|
-export([create_session/2, attach_channel/2]).
|
||||||
-export([reload_device/2, delete_device/2, activate_device/3]).
|
-export([reload_device/2, delete_device/2, activate_device/3]).
|
||||||
-export([heartbeat/1]).
|
-export([heartbeat/1]).
|
||||||
@ -109,6 +109,45 @@ publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(
|
|||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec publish_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map(), Timeout :: integer()) ->
|
||||||
|
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
|
||||||
|
publish_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams, Timeout)
|
||||||
|
when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams), is_integer(Timeout) ->
|
||||||
|
|
||||||
|
Directive = #{
|
||||||
|
<<"device_uuid">> => DeviceUUID,
|
||||||
|
<<"version">> => Version,
|
||||||
|
<<"directive_type">> => DirectiveType,
|
||||||
|
<<"directive">> => DirectiveParams
|
||||||
|
},
|
||||||
|
|
||||||
|
case gen_statem:call(Pid, {publish_directive, self(), Directive}) of
|
||||||
|
{ok, Ref} ->
|
||||||
|
receive
|
||||||
|
{directive_response, Ref} ->
|
||||||
|
ok;
|
||||||
|
{directive_response, Ref, Response} ->
|
||||||
|
{ok, Response}
|
||||||
|
after Timeout ->
|
||||||
|
{error, timeout}
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec send_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map()) ->
|
||||||
|
ok | {error, Reason :: any()}.
|
||||||
|
send_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams)
|
||||||
|
when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams) ->
|
||||||
|
|
||||||
|
Directive = #{
|
||||||
|
<<"device_uuid">> => DeviceUUID,
|
||||||
|
<<"version">> => Version,
|
||||||
|
<<"directive_type">> => DirectiveType,
|
||||||
|
<<"directive">> => DirectiveParams
|
||||||
|
},
|
||||||
|
gen_statem:call(Pid, {send_directive, Directive}).
|
||||||
|
|
||||||
%% 设备管理相关
|
%% 设备管理相关
|
||||||
|
|
||||||
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
|
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
|
||||||
@ -232,6 +271,38 @@ handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid =
|
|||||||
lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]),
|
lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]),
|
||||||
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
|
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
|
||||||
|
|
||||||
|
%% 发送指令时, 指令要通过aes加密,必须要求session是存在的
|
||||||
|
handle_event({call, From}, {publish_directive, ReceiverPid, Directive0}, ?STATE_ACTIVATED,
|
||||||
|
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
|
||||||
|
|
||||||
|
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
|
||||||
|
Directive = iot_cipher_aes:encrypt(AES, Directive0),
|
||||||
|
%% 通过websocket发送请求
|
||||||
|
Ref = ws_channel:publish_directive(ChannelPid, ReceiverPid, Directive),
|
||||||
|
|
||||||
|
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||||
|
|
||||||
|
%% 其他情况下,发送指令是失败的
|
||||||
|
handle_event({call, From}, {publish_directive, _, Directive}, _, State = #state{uuid = UUID}) ->
|
||||||
|
lager:debug("[iot_host] uuid: ~p, publish_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]),
|
||||||
|
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
|
||||||
|
|
||||||
|
%% 发送指令时, 指令要通过aes加密,必须要求session是存在的
|
||||||
|
handle_event({call, From}, {send_directive, Directive0}, ?STATE_ACTIVATED,
|
||||||
|
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
|
||||||
|
|
||||||
|
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
|
||||||
|
Directive = iot_cipher_aes:encrypt(AES, Directive0),
|
||||||
|
%% 通过websocket发送请求
|
||||||
|
ws_channel:send_directive(ChannelPid, Directive),
|
||||||
|
|
||||||
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
|
|
||||||
|
%% 其他情况下,发送指令是失败的
|
||||||
|
handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid = UUID}) ->
|
||||||
|
lager:debug("[iot_host] uuid: ~p, send_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]),
|
||||||
|
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
|
||||||
|
|
||||||
%% 激活主机
|
%% 激活主机
|
||||||
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
||||||
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
||||||
|
|||||||
@ -14,7 +14,7 @@
|
|||||||
-export([init/2]).
|
-export([init/2]).
|
||||||
-export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]).
|
-export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]).
|
||||||
-export([publish/3, stop/2, send/2]).
|
-export([publish/3, stop/2, send/2]).
|
||||||
-export([publish_command/3, send_command/2]).
|
-export([publish_directive/3, send_directive/2]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
uuid :: undefined | binary(),
|
uuid :: undefined | binary(),
|
||||||
@ -26,7 +26,7 @@
|
|||||||
%% 请求响应的对应关系
|
%% 请求响应的对应关系
|
||||||
inflight = #{},
|
inflight = #{},
|
||||||
%% 指令的对应关系
|
%% 指令的对应关系
|
||||||
cmd_inflight = #{}
|
directive_inflight = #{}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% 向通道中写入消息
|
%% 向通道中写入消息
|
||||||
@ -42,18 +42,15 @@ send(Pid, Msg) when is_pid(Pid), is_binary(Msg) ->
|
|||||||
Pid ! {send, Msg}.
|
Pid ! {send, Msg}.
|
||||||
|
|
||||||
%% 发送指令信息
|
%% 发送指令信息
|
||||||
|
-spec publish_directive(Pid :: pid(), ReceiverPid :: pid(), Directive :: binary()) -> Ref :: reference().
|
||||||
%% 向通道中写入消息
|
publish_directive(Pid, ReceiverPid, Directive) when is_pid(Pid), is_binary(Directive) ->
|
||||||
-spec publish_command(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference().
|
|
||||||
publish_command(Pid, ReceiverPid, Cmd) when is_pid(Pid), is_binary(Cmd) ->
|
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
Pid ! {publish_command, ReceiverPid, Ref, Cmd},
|
Pid ! {publish_directive, ReceiverPid, Ref, Directive},
|
||||||
Ref.
|
Ref.
|
||||||
|
|
||||||
%% 向通道中写入消息
|
-spec send_directive(Pid :: pid(), Directive :: binary()) -> no_return().
|
||||||
-spec send_command(Pid :: pid(), Cmd :: binary()) -> no_return().
|
send_directive(Pid, Directive) when is_pid(Pid), is_binary(Directive) ->
|
||||||
send_command(Pid, Cmd) when is_pid(Pid), is_binary(Cmd) ->
|
Pid ! {send_directive, Directive}.
|
||||||
Pid ! {send_command, Cmd}.
|
|
||||||
|
|
||||||
%% 关闭方法
|
%% 关闭方法
|
||||||
-spec stop(Pid :: pid(), Reason :: any()) -> no_return().
|
-spec stop(Pid :: pid(), Reason :: any()) -> no_return().
|
||||||
@ -157,23 +154,27 @@ websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>
|
|||||||
{ok, State#state{inflight = NInflight}}
|
{ok, State#state{inflight = NInflight}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% 命令的响应
|
%% 命令的响应, PacketId == 0 的指令就不应该有返回值
|
||||||
websocket_handle({binary, <<?PACKET_COMMAND_RESPONSE, PacketId:32, Body/binary>>}, State = #state{uuid = UUID, cmd_inflight = CmdInflight}) when PacketId > 0 ->
|
websocket_handle({binary, <<?PACKET_DIRECTIVE_RESPONSE, 0:32, Body/binary>>}, State = #state{uuid = UUID}) ->
|
||||||
lager:debug("[ws_channel] uuid: ~p, get command response message: ~p, packet_id: ~p", [UUID, Body, PacketId]),
|
lager:debug("[ws_channel] uuid: ~p, get send directive response message: ~p", [UUID, Body]),
|
||||||
case maps:take(PacketId, CmdInflight) of
|
{ok, State};
|
||||||
|
|
||||||
|
websocket_handle({binary, <<?PACKET_DIRECTIVE_RESPONSE, PacketId:32, Body/binary>>}, State = #state{uuid = UUID, directive_inflight = DirectiveInflight}) when PacketId > 0 ->
|
||||||
|
lager:debug("[ws_channel] uuid: ~p, get directive response message: ~p, packet_id: ~p", [UUID, Body, PacketId]),
|
||||||
|
case maps:take(PacketId, DirectiveInflight) of
|
||||||
error ->
|
error ->
|
||||||
lager:warning("[ws_channel] get unknown command response message: ~p, packet_id: ~p", [Body, PacketId]),
|
lager:warning("[ws_channel] get unknown directive response message: ~p, packet_id: ~p", [Body, PacketId]),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
{{ReceiverPid, Ref}, NCmdInflight} ->
|
{{ReceiverPid, Ref}, NDirectiveInflight} ->
|
||||||
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||||
true when Body == <<>> ->
|
true when Body == <<>> ->
|
||||||
ReceiverPid ! {cmd_response, Ref};
|
ReceiverPid ! {directive_response, Ref};
|
||||||
true ->
|
true ->
|
||||||
ReceiverPid ! {cmd_response, Ref, Body};
|
ReceiverPid ! {directive_response, Ref, Body};
|
||||||
false ->
|
false ->
|
||||||
lager:warning("[ws_channel] get command response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId])
|
lager:warning("[ws_channel] get directive response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId])
|
||||||
end,
|
end,
|
||||||
{ok, State#state{cmd_inflight = NCmdInflight}}
|
{ok, State#state{directive_inflight = NDirectiveInflight}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
websocket_handle(Info, State) ->
|
websocket_handle(Info, State) ->
|
||||||
@ -195,13 +196,13 @@ websocket_info({send, Msg}, State) when is_binary(Msg) ->
|
|||||||
{reply, {binary, <<?PACKET_PUBLISH, 0:32, Msg/binary>>}, State};
|
{reply, {binary, <<?PACKET_PUBLISH, 0:32, Msg/binary>>}, State};
|
||||||
|
|
||||||
%% 下发指令
|
%% 下发指令
|
||||||
websocket_info({publish_command, ReceiverPid, Ref, Cmd}, State = #state{packet_id = PacketId, cmd_inflight = CmdInflight}) when is_binary(Cmd) ->
|
websocket_info({publish_directive, ReceiverPid, Ref, Directive}, State = #state{packet_id = PacketId, directive_inflight = DirectiveInflight}) when is_binary(Directive) ->
|
||||||
NCmdInflight = maps:put(PacketId, {ReceiverPid, Ref}, CmdInflight),
|
NDirectiveInflight = maps:put(PacketId, {ReceiverPid, Ref}, DirectiveInflight),
|
||||||
{reply, {binary, <<?PACKET_COMMAND, PacketId:32, Cmd/binary>>}, State#state{packet_id = PacketId + 1, cmd_inflight = NCmdInflight}};
|
{reply, {binary, <<?PACKET_DIRECTIVE, PacketId:32, Directive/binary>>}, State#state{packet_id = PacketId + 1, directive_inflight = NDirectiveInflight}};
|
||||||
|
|
||||||
%% 发送指令, 不需要等待回复
|
%% 发送指令, 不需要等待回复
|
||||||
websocket_info({send_command, Cmd}, State) when is_binary(Cmd) ->
|
websocket_info({send_directive, Directive}, State) when is_binary(Directive) ->
|
||||||
{reply, {binary, <<?PACKET_COMMAND, 0:32, Cmd/binary>>}, State};
|
{reply, {binary, <<?PACKET_DIRECTIVE, 0:32, Directive/binary>>}, State};
|
||||||
|
|
||||||
%% 用户进程关闭,则关闭通道
|
%% 用户进程关闭,则关闭通道
|
||||||
websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) ->
|
websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) ->
|
||||||
|
|||||||
@ -246,8 +246,8 @@ Body: 事件内容,AES加密
|
|||||||
{
|
{
|
||||||
"device_uuid": "xxxxxx", // 设备的device_uuid, 数组格式
|
"device_uuid": "xxxxxx", // 设备的device_uuid, 数组格式
|
||||||
"version": "1.0",
|
"version": "1.0",
|
||||||
"command_type": 0x01, // 中电计费电表控制
|
"directive_type": 0x01, // 中电计费电表控制
|
||||||
"command": {
|
"directive": {
|
||||||
"type": "ctrl", // 遥控
|
"type": "ctrl", // 遥控
|
||||||
"stype": int, // 遥控类型,0: 遥控, 1: 遥调, 2: 置数
|
"stype": int, // 遥控类型,0: 遥控, 1: 遥调, 2: 置数
|
||||||
"ctype": int, // 遥控动作, 0: 打开,1: 闭合
|
"ctype": int, // 遥控动作, 0: 打开,1: 闭合
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user