fix ws command
This commit is contained in:
parent
2da8a40ac2
commit
5f28301ed1
@ -14,6 +14,7 @@
|
|||||||
-export([init/2]).
|
-export([init/2]).
|
||||||
-export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]).
|
-export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]).
|
||||||
-export([publish/3, stop/2, send/2]).
|
-export([publish/3, stop/2, send/2]).
|
||||||
|
-export([publish_command/3, send_command/2]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
uuid :: undefined | binary(),
|
uuid :: undefined | binary(),
|
||||||
@ -23,7 +24,9 @@
|
|||||||
packet_id = 1 :: integer(),
|
packet_id = 1 :: integer(),
|
||||||
|
|
||||||
%% 请求响应的对应关系
|
%% 请求响应的对应关系
|
||||||
inflight = #{}
|
inflight = #{},
|
||||||
|
%% 指令的对应关系
|
||||||
|
cmd_inflight = #{}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% 向通道中写入消息
|
%% 向通道中写入消息
|
||||||
@ -38,6 +41,20 @@ publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) ->
|
|||||||
send(Pid, Msg) when is_pid(Pid), is_binary(Msg) ->
|
send(Pid, Msg) when is_pid(Pid), is_binary(Msg) ->
|
||||||
Pid ! {send, Msg}.
|
Pid ! {send, Msg}.
|
||||||
|
|
||||||
|
%% 发送指令信息
|
||||||
|
|
||||||
|
%% 向通道中写入消息
|
||||||
|
-spec publish_command(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference().
|
||||||
|
publish_command(Pid, ReceiverPid, Cmd) when is_pid(Pid), is_binary(Cmd) ->
|
||||||
|
Ref = make_ref(),
|
||||||
|
Pid ! {publish_command, ReceiverPid, Ref, Cmd},
|
||||||
|
Ref.
|
||||||
|
|
||||||
|
%% 向通道中写入消息
|
||||||
|
-spec send_command(Pid :: pid(), Cmd :: binary()) -> no_return().
|
||||||
|
send_command(Pid, Cmd) when is_pid(Pid), is_binary(Cmd) ->
|
||||||
|
Pid ! {send_command, Cmd}.
|
||||||
|
|
||||||
%% 关闭方法
|
%% 关闭方法
|
||||||
-spec stop(Pid :: pid(), Reason :: any()) -> no_return().
|
-spec stop(Pid :: pid(), Reason :: any()) -> no_return().
|
||||||
stop(undefined, _Reason) ->
|
stop(undefined, _Reason) ->
|
||||||
@ -140,6 +157,25 @@ websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>
|
|||||||
{ok, State#state{inflight = NInflight}}
|
{ok, State#state{inflight = NInflight}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
%% 命令的响应
|
||||||
|
websocket_handle({binary, <<?PACKET_COMMAND_RESPONSE, PacketId:32, Body/binary>>}, State = #state{uuid = UUID, cmd_inflight = CmdInflight}) when PacketId > 0 ->
|
||||||
|
lager:debug("[ws_channel] uuid: ~p, get command response message: ~p, packet_id: ~p", [UUID, Body, PacketId]),
|
||||||
|
case maps:take(PacketId, CmdInflight) of
|
||||||
|
error ->
|
||||||
|
lager:warning("[ws_channel] get unknown command response message: ~p, packet_id: ~p", [Body, PacketId]),
|
||||||
|
{ok, State};
|
||||||
|
{{ReceiverPid, Ref}, NCmdInflight} ->
|
||||||
|
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||||
|
true when Body == <<>> ->
|
||||||
|
ReceiverPid ! {cmd_response, Ref};
|
||||||
|
true ->
|
||||||
|
ReceiverPid ! {cmd_response, Ref, Body};
|
||||||
|
false ->
|
||||||
|
lager:warning("[ws_channel] get command response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId])
|
||||||
|
end,
|
||||||
|
{ok, State#state{cmd_inflight = NCmdInflight}}
|
||||||
|
end;
|
||||||
|
|
||||||
websocket_handle(Info, State) ->
|
websocket_handle(Info, State) ->
|
||||||
lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]),
|
lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]),
|
||||||
{stop, State}.
|
{stop, State}.
|
||||||
@ -158,6 +194,15 @@ websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = Pack
|
|||||||
websocket_info({send, Msg}, State) when is_binary(Msg) ->
|
websocket_info({send, Msg}, State) when is_binary(Msg) ->
|
||||||
{reply, {binary, <<?PACKET_PUBLISH, 0:32, Msg/binary>>}, State};
|
{reply, {binary, <<?PACKET_PUBLISH, 0:32, Msg/binary>>}, State};
|
||||||
|
|
||||||
|
%% 下发指令
|
||||||
|
websocket_info({publish_command, ReceiverPid, Ref, Cmd}, State = #state{packet_id = PacketId, cmd_inflight = CmdInflight}) when is_binary(Cmd) ->
|
||||||
|
NCmdInflight = maps:put(PacketId, {ReceiverPid, Ref}, CmdInflight),
|
||||||
|
{reply, {binary, <<?PACKET_COMMAND, PacketId:32, Cmd/binary>>}, State#state{packet_id = PacketId + 1, cmd_inflight = NCmdInflight}};
|
||||||
|
|
||||||
|
%% 发送指令, 不需要等待回复
|
||||||
|
websocket_info({send_command, Cmd}, State) when is_binary(Cmd) ->
|
||||||
|
{reply, {binary, <<?PACKET_COMMAND, 0:32, Cmd/binary>>}, State};
|
||||||
|
|
||||||
%% 用户进程关闭,则关闭通道
|
%% 用户进程关闭,则关闭通道
|
||||||
websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) ->
|
websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) ->
|
||||||
lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]),
|
lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user