From fb0ad40a44e8a33b64f1328ae8db4731cadca93e Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 18 Sep 2025 15:02:31 +0800 Subject: [PATCH] fix http --- apps/iot/include/message.hrl | 39 +++++------- ...vice_handler.erl => container_handler.erl} | 12 ++-- .../{http_server.erl => http_protocol.erl} | 29 +-------- apps/iot/src/iot_app.erl | 27 +++++++- apps/iot/src/iot_host.erl | 61 ++++++++----------- apps/iot/src/message/message_codec.erl | 14 ++--- apps/iot/src/tcp/tcp_channel.erl | 34 +++++------ 7 files changed, 96 insertions(+), 120 deletions(-) rename apps/iot/src/http_handlers/{service_handler.erl => container_handler.erl} (89%) rename apps/iot/src/http_handlers/{http_server.erl => http_protocol.erl} (77%) diff --git a/apps/iot/include/message.hrl b/apps/iot/include/message.hrl index f76b53f..f5f13c9 100644 --- a/apps/iot/include/message.hrl +++ b/apps/iot/include/message.hrl @@ -15,32 +15,21 @@ %% efka主动发起不需要返回的数据 -define(PACKET_CAST, 16#03). -%% 服务器基于pub/sub的消息, 消息大类 --define(PACKET_PUB, 16#04). -%% push调用不需要返回, 消息大类 --define(PACKET_COMMAND, 16#05). - -%% 服务器主动发起的RPC调用 -%% 服务端不能直接和Service通讯,链路太麻烦了,并且服务端也不知道如何和服务通讯 --define(PACKET_RPC, 16#10). --define(PACKET_RPC_REPLY, 16#11). - -%% ping包,客户端主动发起 --define(PACKET_PING, 16#FF). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%% 二级分类定义 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 主机端上报数据类型标识 -define(MESSAGE_AUTH_REQUEST, 16#01). --define(MESSAGE_AUTH_REPLY, 16#02). --define(MESSAGE_PUB, 16#03). --define(MESSAGE_COMMAND, 16#04). --define(MESSAGE_RPC_DEPLOY, 16#05). --define(MESSAGE_RPC_CONTAINER, 16#06). --define(MESSAGE_DATA, 16#07). --define(MESSAGE_EVENT, 16#08). +-define(MESSAGE_PUB, 16#02). +-define(MESSAGE_COMMAND, 16#03). +-define(MESSAGE_RPC_DEPLOY, 16#04). +-define(MESSAGE_RPC_CONTAINER, 16#05). +-define(MESSAGE_DATA, 16#06). +-define(MESSAGE_EVENT, 16#07). + +%% 响应数据 +-define(MESSAGE_RPC_REPLY, 16#FF). %%%% 命令类型子分类, 不需要返回值 %% 授权 @@ -54,11 +43,6 @@ timestamp :: integer() }). --record(auth_reply, { - code :: integer(), - message :: binary() -}). - -record(pub, { topic :: binary(), content :: binary() @@ -80,6 +64,11 @@ params = <<>> :: binary() }). +-record(rpc_reply, { + code :: integer(), + payload :: binary() +}). + -record(data, { service_id :: binary(), device_uuid :: binary(), diff --git a/apps/iot/src/http_handlers/service_handler.erl b/apps/iot/src/http_handlers/container_handler.erl similarity index 89% rename from apps/iot/src/http_handlers/service_handler.erl rename to apps/iot/src/http_handlers/container_handler.erl index 9e511f4..1eb0e1f 100644 --- a/apps/iot/src/http_handlers/service_handler.erl +++ b/apps/iot/src/http_handlers/container_handler.erl @@ -6,7 +6,7 @@ %%% @end %%% Created : 26. 4月 2020 3:36 下午 %%%------------------------------------------------------------------- --module(service_handler). +-module(container_handler). -author("licheng5"). -include("iot.hrl"). @@ -14,7 +14,7 @@ -export([handle_request/4]). %% 下发config.json, 微服务接受后,保存服务配置 -handle_request("POST", "/service/push_config", _, +handle_request("POST", "/container/push_config", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) -> @@ -39,7 +39,7 @@ handle_request("POST", "/service/push_config", _, end; %% 部署微服务 -handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) +handle_request("POST", "/container/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> case iot_host:get_pid(UUID) of @@ -60,7 +60,7 @@ handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> end; %% 启动服务 -handle_request("POST", "/service/start", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> +handle_request("POST", "/container/start", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> case iot_host:get_pid(UUID) of undefined -> {ok, 200, iot_util:json_error(404, <<"host not found">>)}; @@ -79,7 +79,7 @@ handle_request("POST", "/service/start", _, #{<<"uuid">> := UUID, <<"service_id" end; %% 停止服务 -handle_request("POST", "/service/stop", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> +handle_request("POST", "/container/stop", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> case iot_host:get_pid(UUID) of undefined -> {ok, 200, iot_util:json_error(404, <<"host not found">>)}; @@ -119,7 +119,7 @@ handle_request("POST", "/service/invoke", _, #{<<"uuid">> := UUID, <<"service_id end end; -handle_request("POST", "/service/task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) -> +handle_request("POST", "/container/task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) -> case iot_host:get_pid(UUID) of undefined -> {ok, 200, iot_util:json_error(404, <<"host not found">>)}; diff --git a/apps/iot/src/http_handlers/http_server.erl b/apps/iot/src/http_handlers/http_protocol.erl similarity index 77% rename from apps/iot/src/http_handlers/http_server.erl rename to apps/iot/src/http_handlers/http_protocol.erl index ea4debe..f8cb7b9 100644 --- a/apps/iot/src/http_handlers/http_server.erl +++ b/apps/iot/src/http_handlers/http_protocol.erl @@ -6,39 +6,12 @@ %%% @end %%% Created : 08. 5月 2025 13:00 %%%------------------------------------------------------------------- --module(http_server). +-module(http_protocol). -author("anlicheng"). %% API --export([start/0]). -export([init/2]). -%% 启动http服务 -start() -> - {ok, Props} = application:get_env(iot, http_server), - Acceptors = proplists:get_value(acceptors, Props, 50), - MaxConnections = proplists:get_value(max_connections, Props, 10240), - Backlog = proplists:get_value(backlog, Props, 1024), - Port = proplists:get_value(port, Props), - - Dispatcher = cowboy_router:compile([ - {'_', [ - {"/host/[...]", ?MODULE, [host_handler]}, - {"/service/[...]", ?MODULE, [service_handler]}, - {"/device/[...]", ?MODULE, [device_handler]} - ]} - ]), - - TransOpts = [ - {port, Port}, - {num_acceptors, Acceptors}, - {backlog, Backlog}, - {max_connections, MaxConnections} - ], - {ok, Pid} = cowboy:start_clear(http_listener, TransOpts, #{env => #{dispatch => Dispatcher}}), - - lager:debug("[http_server] the http server start at: ~p, pid is: ~p", [Port, Pid]). - init(Req0, Opts = [Mod|_]) -> Method = binary_to_list(cowboy_req:method(Req0)), Path = binary_to_list(cowboy_req:path(Req0)), diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 4e65d50..c0f0cfe 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -16,7 +16,7 @@ start(_StartType, _StartArgs) -> start_mnesia(), %% 启动http服务 - http_server:start(), + start_http_server(), %% 启动tcp服务 tcp_server:start(), @@ -38,6 +38,31 @@ start_mnesia() -> %% 创建数据库表 ok. +start_http_server() -> + {ok, Props} = application:get_env(iot, http_server), + Acceptors = proplists:get_value(acceptors, Props, 50), + MaxConnections = proplists:get_value(max_connections, Props, 10240), + Backlog = proplists:get_value(backlog, Props, 1024), + Port = proplists:get_value(port, Props), + + Dispatcher = cowboy_router:compile([ + {'_', [ + {"/host/[...]", http_protocol, [host_handler]}, + {"/container/[...]", http_protocol, [container_handler]}, + {"/device/[...]", http_protocol, [device_handler]} + ]} + ]), + + TransOpts = [ + {port, Port}, + {num_acceptors, Acceptors}, + {backlog, Backlog}, + {max_connections, MaxConnections} + ], + {ok, Pid} = cowboy:start_clear(http_listener, TransOpts, #{env => #{dispatch => Dispatcher}}), + + lager:debug("[http_server] the http server start at: ~p, pid is: ~p", [Port, Pid]). + -spec ensure_mnesia_schema() -> any(). ensure_mnesia_schema() -> case mnesia:system_info(use_dir) of diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index cf39329..36767b7 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -9,7 +9,7 @@ -module(iot_host). -author("aresei"). -include("iot.hrl"). --include("message_pb.hrl"). +-include("message.hrl"). -behaviour(gen_statem). @@ -25,7 +25,7 @@ -export([get_metric/1, get_status/1]). %% 通讯相关 -export([pub/3, attach_channel/2, command/3]). --export([deploy_service/4, start_service/2, stop_service/2, invoke_service/4, async_service_config/4, task_log/2, await_reply/2]). +-export([deploy_container/3, start_container/2, stop_container/2, config_container/3, await_reply/2]). %% 设备管理 -export([reload_device/2, delete_device/2, activate_device/3]). -export([heartbeat/1]). @@ -89,40 +89,37 @@ get_metric(Pid) when is_pid(Pid) -> attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> gen_statem:call(Pid, {attach_channel, ChannelPid}). --spec async_service_config(Pid :: pid(), ServiceId :: binary(), ConfigJson :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -async_service_config(Pid, ServiceId, ConfigJson, Timeout) when is_pid(Pid), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout) -> - ConfigBin = message_pb:encode_msg(#push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout}), - gen_statem:call(Pid, {async_call, self(), ?PUSH_SERVICE_CONFIG, ConfigBin}). +-spec config_container(Pid :: pid(), ContainerName :: binary(), ConfigJson :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(ContainerName), is_binary(ConfigJson) -> + EncConfigBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"config">>, container_name = ContainerName, params = ConfigJson}), + gen_statem:call(Pid, {rpc_call, self(), EncConfigBin}). --spec deploy_service(Pid :: pid(), TaskId :: integer(), ServiceId :: binary(), TarUrl :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -deploy_service(Pid, TaskId, ServiceId, TarUrl) when is_pid(Pid), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> - PushBin = message_pb:encode_msg(#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl}), - gen_statem:call(Pid, {async_call, self(), ?PUSH_DEPLOY, PushBin}). +-spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_binary(Config) -> + EncDeployBin = message_codec:encode(?MESSAGE_RPC_DEPLOY, #rpc_deploy{task_id = TaskId, config = Config}), + gen_statem:call(Pid, {rpc_call, self(), ?PUSH_DEPLOY, EncDeployBin}). --spec start_service(Pid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -start_service(Pid, ServiceId) when is_pid(Pid), is_binary(ServiceId) -> - gen_statem:call(Pid, {async_call, self(), ?PUSH_START_SERVICE, ServiceId}). +-spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> + EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"start">>, container_name = ContainerName}), + gen_statem:call(Pid, {rpc_call, self(), EncCallBin}). --spec stop_service(Pid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -stop_service(Pid, ServiceId) when is_pid(Pid), is_binary(ServiceId) -> - gen_statem:call(Pid, {async_call, self(), ?PUSH_STOP_SERVICE, ServiceId}). +-spec stop_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> + EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"stop">>, container_name = ContainerName}), + gen_statem:call(Pid, {rpc_call, self(), EncCallBin}). --spec invoke_service(Pid :: pid(), ServiceId :: binary(), Payload :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -invoke_service(Pid, ServiceId, Payload, Timeout) when is_pid(Pid), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout) -> - InvokeBin = message_pb:encode_msg(#invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}), - gen_statem:call(Pid, {async_call, self(), ?PUSH_INVOKE, InvokeBin}). - --spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) -> - TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}), - gen_statem:call(Pid, {async_call, self(), ?PUSH_TASK_LOG, TaskLogBin}). +%-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +%task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) -> +% TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}), +% gen_statem:call(Pid, {rpc_call, self(), ?PUSH_TASK_LOG, TaskLogBin}). -spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}. await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> receive - {async_call_reply, Ref, #async_call_reply{code = 1, result = Result}} -> + {rpc_reply, Ref, #rpc_reply{code = 1, payload = Result}} -> {ok, Result}; - {async_call_reply, Ref, #async_call_reply{code = 0, message = Message}} -> + {rpc_reply, Ref, #rpc_reply{code = 0, payload = Message}} -> {error, Message} after Timeout -> {error, <<"timeout">>} @@ -227,14 +224,14 @@ handle_event({call, From}, get_status, _, State = #state{channel_pid = ChannelPi {keep_state, State, [{reply, From, {ok, Reply}}]}; %% 只要channel存在,就负责将消息推送到边缘端主机 -handle_event({call, From}, {async_call, ReceiverPid, PushType, PushBin}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> +handle_event({call, From}, {rpc_call, ReceiverPid, RpcCall}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> case HasSession andalso is_pid(ChannelPid) of true -> %% 通过websocket发送请求 - Ref = tcp_channel:async_call(ChannelPid, ReceiverPid, PushType, PushBin), + Ref = tcp_channel:rpc_call(ChannelPid, ReceiverPid, RpcCall), {keep_state, State, [{reply, From, {ok, Ref}}]}; false -> - lager:debug("[iot_host] uuid: ~p, publish_type: ~p, invalid state: ~p", [UUID, PushType, state_map(State)]), + lager:debug("[iot_host] uuid: ~p, invalid state: ~p", [UUID, state_map(State)]), {keep_state, State, [{reply, From, {error, <<"主机离线,发送请求失败"/utf8>>}}]} end; @@ -372,10 +369,6 @@ handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{u lager:debug("[iot_host] ping host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), {keep_state, State#state{metrics = Metrics}}; -handle_event(cast, {handle, {inform, #service_inform{service_id = ServiceId, status = Status, timestamp = Timestamp}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> - lager:debug("[iot_host] inform host: ~p, service_id: ~p, status: ~p, timestamp: ~p", [UUID, ServiceId, Status, Timestamp]), - {keep_state, State}; - handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]), %DevicePid = iot_device:get_pid(DeviceUUID), diff --git a/apps/iot/src/message/message_codec.erl b/apps/iot/src/message/message_codec.erl index e52442b..6e2927b 100644 --- a/apps/iot/src/message/message_codec.erl +++ b/apps/iot/src/message/message_codec.erl @@ -16,10 +16,10 @@ %% API -export([encode/2, decode/1]). --spec encode0(Message :: any()) -> binary(). -encode(PacketType, Packet) when is_integer(PacketType) -> - Bin = encode0(Packet), - <>. +-spec encode(MessageType :: integer(), Message :: any()) -> binary(). +encode(MessageType, Message) when is_integer(MessageType) -> + Bin = encode0(Message), + <>. encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}) -> iolist_to_binary([ marshal(?Bytes, UUID), @@ -28,7 +28,7 @@ encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Tok marshal(?Bytes, Token), marshal(?I32, Timestamp) ]); -encode0(#auth_reply{code = Code, message = Message}) -> +encode0(#rpc_reply{code = Code, payload = Message}) -> iolist_to_binary([ marshal(?I32, Code), marshal(?Bytes, Message) @@ -78,8 +78,8 @@ decode(<>) -> end. decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) -> {ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}}; -decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) -> - {ok, #auth_reply{code = Code, message = Message}}; +decode0(?MESSAGE_RPC_REPLY, [Code, Message]) -> + {ok, #rpc_reply{code = Code, payload = Message}}; decode0(?MESSAGE_PUB, [Topic, Content]) -> {ok, #pub{topic = Topic, content = Content}}; decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index 5fd52df..df2495a 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -11,10 +11,9 @@ -include("message.hrl"). %% API --export([pub/3, async_call/4, command/3]). --export([stop/2]). +-export([pub/3, rpc_call/3, command/3]). --export([start_link/2]). +-export([start_link/2, stop/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). @@ -43,10 +42,10 @@ command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is gen_server:cast(Pid, {command, CommandType, Command}). %% 向通道中写入消息 --spec async_call(Pid :: pid(), ReceiverPid :: pid(), CallType :: integer(), CallBin :: binary()) -> Ref :: reference(). -async_call(Pid, ReceiverPid, CallType, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_integer(CallType), is_binary(CallBin) -> +-spec rpc_call(Pid :: pid(), ReceiverPid :: pid(), CallBin :: binary()) -> Ref :: reference(). +rpc_call(Pid, ReceiverPid, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_binary(CallBin) -> Ref = make_ref(), - gen_server:cast(Pid, {async_call, ReceiverPid, Ref, CallType, CallBin}), + gen_server:cast(Pid, {rpc_call, ReceiverPid, Ref, CallBin}), Ref. %% 关闭方法 @@ -79,18 +78,19 @@ handle_call(_Request, _From, State) -> %% 发送消息, 基于pub/sub机制 handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) -> - PubBin = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}), - Transport:send(Socket, <>), + EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}), + Transport:send(Socket, <>), {noreply, State}; %% 发送Command消息 handle_cast({command, CommandType, Command}, State = #state{transport = Transport, socket = Socket}) -> - Transport:send(Socket, <>), + EncCommand = message_codec:encode(?MESSAGE_COMMAND, #command{command_type = CommandType, command = Command}), + Transport:send(Socket, <>), {noreply, State}; %% 推送消息 -handle_cast({async_call, ReceiverPid, Ref, CallType, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Transport:send(Socket, <>), +handle_cast({rpc_call, ReceiverPid, Ref, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Transport:send(Socket, <>), {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}. %% auth验证 @@ -110,20 +110,20 @@ handle_info({tcp, Socket, <>}, ok -> %% 建立到host的monitor erlang:monitor(process, HostPid), - AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 0, message = <<"ok">>}), + AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = <<"ok">>}), Transport:send(Socket, <>), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {denied, Reason} when is_binary(Reason) -> erlang:monitor(process, HostPid), - AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 1, message = Reason}), + AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {error, Reason} when is_binary(Reason) -> - AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 2, message = Reason}), + AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 2, payload = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), @@ -151,7 +151,7 @@ handle_info({tcp, Socket, <>}, State = #state{sock % {noreply, State}; %% 主机端的消息响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 -> {ok, RpcReply} = message_codec:decode(ResponseBin), case maps:take(PacketId, Inflight) of error -> @@ -166,10 +166,6 @@ handle_info({tcp, Socket, <> {noreply, State#state{inflight = NInflight}} end; -%% 来自efka的ping包 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> - {noreply, State}; - handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]), {stop, normal, State};