simple directive
This commit is contained in:
parent
c6879d5183
commit
ec74c55c97
@ -44,11 +44,6 @@
|
||||
-define(PACKET_PUBLISH, 16#03).
|
||||
-define(PACKET_PUBLISH_RESPONSE, 16#04).
|
||||
|
||||
%% 服务端指令
|
||||
% directive
|
||||
-define(PACKET_DIRECTIVE, 16#05).
|
||||
-define(PACKET_DIRECTIVE_RESPONSE, 16#06).
|
||||
|
||||
%% 事件类型
|
||||
-define(EVENT_DEVICE, 16#01).
|
||||
%% 主机的相关事件
|
||||
|
||||
@ -14,7 +14,6 @@
|
||||
-export([init/2]).
|
||||
-export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]).
|
||||
-export([publish/3, stop/2, send/2]).
|
||||
-export([publish_directive/3, send_directive/2]).
|
||||
|
||||
-record(state, {
|
||||
uuid :: undefined | binary(),
|
||||
@ -24,9 +23,7 @@
|
||||
packet_id = 1 :: integer(),
|
||||
|
||||
%% 请求响应的对应关系
|
||||
inflight = #{},
|
||||
%% 指令的对应关系
|
||||
directive_inflight = #{}
|
||||
inflight = #{}
|
||||
}).
|
||||
|
||||
%% 向通道中写入消息
|
||||
@ -41,17 +38,6 @@ publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) ->
|
||||
send(Pid, Msg) when is_pid(Pid), is_binary(Msg) ->
|
||||
Pid ! {send, Msg}.
|
||||
|
||||
%% 发送指令信息
|
||||
-spec publish_directive(Pid :: pid(), ReceiverPid :: pid(), Directive :: binary()) -> Ref :: reference().
|
||||
publish_directive(Pid, ReceiverPid, Directive) when is_pid(Pid), is_binary(Directive) ->
|
||||
Ref = make_ref(),
|
||||
Pid ! {publish_directive, ReceiverPid, Ref, Directive},
|
||||
Ref.
|
||||
|
||||
-spec send_directive(Pid :: pid(), Directive :: binary()) -> no_return().
|
||||
send_directive(Pid, Directive) when is_pid(Pid), is_binary(Directive) ->
|
||||
Pid ! {send_directive, Directive}.
|
||||
|
||||
%% 关闭方法
|
||||
-spec stop(Pid :: pid(), Reason :: any()) -> no_return().
|
||||
stop(undefined, _Reason) ->
|
||||
@ -154,29 +140,6 @@ websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>
|
||||
{ok, State#state{inflight = NInflight}}
|
||||
end;
|
||||
|
||||
%% 命令的响应, PacketId == 0 的指令就不应该有返回值
|
||||
websocket_handle({binary, <<?PACKET_DIRECTIVE_RESPONSE, 0:32, Body/binary>>}, State = #state{uuid = UUID}) ->
|
||||
lager:debug("[ws_channel] uuid: ~p, get send directive response message: ~p", [UUID, Body]),
|
||||
{ok, State};
|
||||
|
||||
websocket_handle({binary, <<?PACKET_DIRECTIVE_RESPONSE, PacketId:32, Body/binary>>}, State = #state{uuid = UUID, directive_inflight = DirectiveInflight}) when PacketId > 0 ->
|
||||
lager:debug("[ws_channel] uuid: ~p, get directive response message: ~p, packet_id: ~p", [UUID, Body, PacketId]),
|
||||
case maps:take(PacketId, DirectiveInflight) of
|
||||
error ->
|
||||
lager:warning("[ws_channel] get unknown directive response message: ~p, packet_id: ~p", [Body, PacketId]),
|
||||
{ok, State};
|
||||
{{ReceiverPid, Ref}, NDirectiveInflight} ->
|
||||
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||
true when Body == <<>> ->
|
||||
ReceiverPid ! {directive_response, Ref};
|
||||
true ->
|
||||
ReceiverPid ! {directive_response, Ref, Body};
|
||||
false ->
|
||||
lager:warning("[ws_channel] get directive response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId])
|
||||
end,
|
||||
{ok, State#state{directive_inflight = NDirectiveInflight}}
|
||||
end;
|
||||
|
||||
websocket_handle(Info, State) ->
|
||||
lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]),
|
||||
{stop, State}.
|
||||
@ -195,15 +158,6 @@ websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = Pack
|
||||
websocket_info({send, Msg}, State) when is_binary(Msg) ->
|
||||
{reply, {binary, <<?PACKET_PUBLISH, 0:32, Msg/binary>>}, State};
|
||||
|
||||
%% 下发指令
|
||||
websocket_info({publish_directive, ReceiverPid, Ref, Directive}, State = #state{packet_id = PacketId, directive_inflight = DirectiveInflight}) when is_binary(Directive) ->
|
||||
NDirectiveInflight = maps:put(PacketId, {ReceiverPid, Ref}, DirectiveInflight),
|
||||
{reply, {binary, <<?PACKET_DIRECTIVE, PacketId:32, Directive/binary>>}, State#state{packet_id = PacketId + 1, directive_inflight = NDirectiveInflight}};
|
||||
|
||||
%% 发送指令, 不需要等待回复
|
||||
websocket_info({send_directive, Directive}, State) when is_binary(Directive) ->
|
||||
{reply, {binary, <<?PACKET_DIRECTIVE, 0:32, Directive/binary>>}, State};
|
||||
|
||||
%% 用户进程关闭,则关闭通道
|
||||
websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) ->
|
||||
lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user