diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 602cc29..39cd286 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -44,11 +44,6 @@ -define(PACKET_PUBLISH, 16#03). -define(PACKET_PUBLISH_RESPONSE, 16#04). -%% 服务端指令 -% directive --define(PACKET_DIRECTIVE, 16#05). --define(PACKET_DIRECTIVE_RESPONSE, 16#06). - %% 事件类型 -define(EVENT_DEVICE, 16#01). %% 主机的相关事件 diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index bdf5dad..943ec6c 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -14,7 +14,6 @@ -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). -export([publish/3, stop/2, send/2]). --export([publish_directive/3, send_directive/2]). -record(state, { uuid :: undefined | binary(), @@ -24,9 +23,7 @@ packet_id = 1 :: integer(), %% 请求响应的对应关系 - inflight = #{}, - %% 指令的对应关系 - directive_inflight = #{} + inflight = #{} }). %% 向通道中写入消息 @@ -41,17 +38,6 @@ publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) -> send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> Pid ! {send, Msg}. -%% 发送指令信息 --spec publish_directive(Pid :: pid(), ReceiverPid :: pid(), Directive :: binary()) -> Ref :: reference(). -publish_directive(Pid, ReceiverPid, Directive) when is_pid(Pid), is_binary(Directive) -> - Ref = make_ref(), - Pid ! {publish_directive, ReceiverPid, Ref, Directive}, - Ref. - --spec send_directive(Pid :: pid(), Directive :: binary()) -> no_return(). -send_directive(Pid, Directive) when is_pid(Pid), is_binary(Directive) -> - Pid ! {send_directive, Directive}. - %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). stop(undefined, _Reason) -> @@ -154,29 +140,6 @@ websocket_handle({binary, <> {ok, State#state{inflight = NInflight}} end; -%% 命令的响应, PacketId == 0 的指令就不应该有返回值 -websocket_handle({binary, <>}, State = #state{uuid = UUID}) -> - lager:debug("[ws_channel] uuid: ~p, get send directive response message: ~p", [UUID, Body]), - {ok, State}; - -websocket_handle({binary, <>}, State = #state{uuid = UUID, directive_inflight = DirectiveInflight}) when PacketId > 0 -> - lager:debug("[ws_channel] uuid: ~p, get directive response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), - case maps:take(PacketId, DirectiveInflight) of - error -> - lager:warning("[ws_channel] get unknown directive response message: ~p, packet_id: ~p", [Body, PacketId]), - {ok, State}; - {{ReceiverPid, Ref}, NDirectiveInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true when Body == <<>> -> - ReceiverPid ! {directive_response, Ref}; - true -> - ReceiverPid ! {directive_response, Ref, Body}; - false -> - lager:warning("[ws_channel] get directive response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) - end, - {ok, State#state{directive_inflight = NDirectiveInflight}} - end; - websocket_handle(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}. @@ -195,15 +158,6 @@ websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = Pack websocket_info({send, Msg}, State) when is_binary(Msg) -> {reply, {binary, <>}, State}; -%% 下发指令 -websocket_info({publish_directive, ReceiverPid, Ref, Directive}, State = #state{packet_id = PacketId, directive_inflight = DirectiveInflight}) when is_binary(Directive) -> - NDirectiveInflight = maps:put(PacketId, {ReceiverPid, Ref}, DirectiveInflight), - {reply, {binary, <>}, State#state{packet_id = PacketId + 1, directive_inflight = NDirectiveInflight}}; - -%% 发送指令, 不需要等待回复 -websocket_info({send_directive, Directive}, State) when is_binary(Directive) -> - {reply, {binary, <>}, State}; - %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]),