diff --git a/apps/efka/include/efka.hrl b/apps/efka/include/efka.hrl index 1586aa7..ea6c056 100644 --- a/apps/efka/include/efka.hrl +++ b/apps/efka/include/efka.hrl @@ -8,33 +8,40 @@ %%%------------------------------------------------------------------- -author("anlicheng"). -%% efka主动发起的消息体类型 +%% efka主动发起的消息体类型, 消息大类 -define(PACKET_REQUEST, 16#01). -define(PACKET_RESPONSE, 16#02). -%% 主机端上报数据类型标识 --define(METHOD_AUTH, 16#00). --define(METHOD_DATA, 16#02). --define(METHOD_PING, 16#03). --define(METHOD_INFORM, 16#04). --define(METHOD_EVENT, 16#07). --define(METHOD_PHASE, 16#09). -%% 服务器基于pub/sub的消息 +%% 服务器基于pub/sub的消息, 消息大类 -define(PACKET_PUB, 16#03). -%% push调用不需要返回 +%% push调用不需要返回, 消息大类 -define(PACKET_COMMAND, 16#04). -%% 授权 --define(COMMAND_AUTH, 16#08). - %% 服务器端推送消息 -define(PACKET_PUSH, 16#05). -define(PACKET_PUSH_REPLY, 16#06). -%% 主动推送的消息类型 +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%%% 二级分类定义 +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% 主机端上报数据类型标识 +-define(METHOD_AUTH, 16#01). +-define(METHOD_DATA, 16#02). +-define(METHOD_PING, 16#03). +-define(METHOD_INFORM, 16#04). +-define(METHOD_EVENT, 16#05). +-define(METHOD_PHASE, 16#06). + +%%%% 命令类型子分类, 不需要返回值 + +%% 授权 +-define(COMMAND_AUTH, 16#08). + +%%%% 主动推送的消息类型子分类, 需要返回值 %% 部署微服务 -define(PUSH_DEPLOY, 16#01). %% 服务配置 --define(PUSH_SERVICE_CONFIG, 16#02). +-define(PUSH_SERVICE_CONFIG, 16#02). \ No newline at end of file diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index d6b7182..74e5d93 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -275,7 +275,7 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, pay case efka_service:get_pid(ServiceId) of undefined -> Reply = #push_reply{code = 0, message = <<"micro_service not run">>, result = <<>>}, - safe_response(PacketId, message_pb:encode_msg(Reply), State); + safe_push_response(PacketId, message_pb:encode_msg(Reply), State); ServicePid when is_pid(ServicePid) -> Ref = make_ref(), efka_service:invoke(ServicePid, Ref, Payload), @@ -288,7 +288,7 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, pay {noreply, State}; %% 处理命令 -handle_info({command, ?COMMAND_AUTH, <>}, State = #state{transport_pid = TransportPid, status = Status}) -> +handle_info({server_command, ?COMMAND_AUTH, <>}, State = #state{transport_pid = TransportPid, status = Status}) -> case {Auth, Status} of {1, ?STATE_ACTIVATED} -> {noreply, State}; @@ -302,7 +302,7 @@ handle_info({command, ?COMMAND_AUTH, <>}, State = #state{transport_pid = end; %% 收到需要回复的指令 -handle_info({pub, Topic, Content}, State = #state{status = ?STATE_ACTIVATED}) -> +handle_info({server_pub, Topic, Content}, State = #state{status = ?STATE_ACTIVATED}) -> efka_logger:debug("[efka_agent] get pub topic: ~p, content: ~p", [Topic, Content]), %% 消息发送到订阅系统 efka_subscription:publish(Topic, Content), @@ -321,7 +321,7 @@ handle_info({ems_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) -> {error, Reason} -> #push_reply{code = 0, message = Reason} end, - safe_response(PacketId, message_pb:encode_msg(Reply), State), + safe_push_response(PacketId, message_pb:encode_msg(Reply), State), {noreply, State#state{inflight = NInflight}} end; @@ -333,7 +333,7 @@ handle_info({timeout, _, {request_timeout, Ref}}, State = #state{inflight = Infl {noreply, State}; {PacketId, NInflight} -> Reply = #push_reply{code = 0, message = <<"reqeust timeout">>, result = <<>>}, - safe_response(PacketId, message_pb:encode_msg(Reply), State), + safe_push_response(PacketId, message_pb:encode_msg(Reply), State), {noreply, State#state{inflight = NInflight}} end; @@ -370,10 +370,10 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %% 安全回复 --spec safe_response(PacketId :: integer(), Reply :: binary(), State :: #state{}) -> no_return(). -safe_response(PacketId, Reply, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_integer(PacketId), is_binary(Reply), is_pid(TransportPid) -> - is_process_alive(TransportPid) andalso efka_transport:async_response(TransportPid, PacketId, Reply); -safe_response(_PacketId, _Reply, #state{}) -> +-spec safe_push_response(PacketId :: integer(), Reply :: binary(), State :: #state{}) -> no_return(). +safe_push_response(PacketId, Reply, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_integer(PacketId), is_binary(Reply), is_pid(TransportPid) -> + is_process_alive(TransportPid) andalso efka_transport:push_response(TransportPid, PacketId, Reply); +safe_push_response(_PacketId, _Reply, #state{}) -> ok. %% 当连接正常的时候发送,否则暂存数据 diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 5037787..9bcec2b 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -15,7 +15,7 @@ %% API -export([start_link/3]). --export([connect/1, auth_request/2, send/3, async_response/3, stop/1]). +-export([connect/1, auth_request/2, send/3, push_response/3, stop/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -46,9 +46,9 @@ connect(Pid) when is_pid(Pid) -> send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) -> gen_server:cast(Pid, {send, Method, Packet}). --spec async_response(Pid :: pid(), PacketId :: integer(), Response :: binary()) -> no_return(). -async_response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) -> - gen_server:cast(Pid, {async_response, PacketId, Response}). +-spec push_response(Pid :: pid(), PacketId :: integer(), Response :: binary()) -> no_return(). +push_response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) -> + gen_server:cast(Pid, {push_response, PacketId, Response}). -spec stop(Pid :: pid()) -> ok. stop(Pid) when is_pid(Pid) -> @@ -138,8 +138,9 @@ handle_cast({send, Method, Packet}, State = #state{socket = Socket}) -> ok = ssl:send(Socket, <>), {noreply, State}; -handle_cast({async_response, PacketId, Response}, State = #state{socket = Socket}) -> - ok = ssl:send(Socket, <>), +%% 服务push的消息的回复 +handle_cast({push_response, PacketId, Response}, State = #state{socket = Socket}) -> + ok = ssl:send(Socket, <>), {noreply, State}. %% @private @@ -150,31 +151,27 @@ handle_cast({async_response, PacketId, Response}, State = #state{socket = Socket {stop, Reason :: term(), NewState :: #state{}}). %% 服务器主动推送的数据,有packetId的是要求返回的;为0的表示不需要返回值 handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - efka_logger:debug("[efka_agent] socket get command_type: ~p, command: ~p", [CommandType, Command]), - ParentPid ! {command, CommandType, Command}, + ParentPid ! {server_command, CommandType, Command}, {noreply, State}; handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> #pub{topic = Topic, content = Content} = message_pb:decode_msg(PubBin, pub), - efka_logger:debug("[efka_agent] socket get pub: ~p", [PubBin]), - ParentPid ! {pub, Topic, Content}, + ParentPid ! {server_pub, Topic, Content}, {noreply, State}; -%% 目前推送的消息包括: <>, <<16:8, Directive/binary>> -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - AsyncRequest = message_pb:decode_msg(RequestBin, async_request), - efka_logger:debug("[efka_agent] socket get async request: ~p", [AsyncRequest]), - ParentPid ! {async_request, PacketId, AsyncRequest}, +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> + ParentPid ! {server_push, PacketId, PushBin}, {noreply, State}; -handle_info({ssl_error, Socket, _Reason}, State = #state{socket = Socket}) -> +handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> + lager:debug("[efka_transport] ssl error: ~p", [Reason]), {stop, normal, State}; handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) -> {stop, normal, State}; handle_info(Info, State = #state{}) -> - efka_logger:debug("[efka_transport] info: ~p", [Info]), + lager:notice("[efka_transport] get unknown info: ~p", [Info]), {noreply, State}. %% @private @@ -185,7 +182,7 @@ handle_info(Info, State = #state{}) -> -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). terminate(Reason, #state{}) -> - efka_logger:debug("[efka_transport] terminate with reason: ~p", [Reason]), + efka_logger:notice("[efka_transport] terminate with reason: ~p", [Reason]), ok. %% @private diff --git a/message_pb.proto b/message_pb.proto index 3831139..f51bc15 100644 --- a/message_pb.proto +++ b/message_pb.proto @@ -15,7 +15,8 @@ message AuthReply { string message = 2; } -// 不需要响应的,基于pub/sub的机制实现远程调用 +// 不需要响应的,基于pub/sub的机制实现远程调用; 不需要指定service_id +// service_id主动订阅消息, 基于广播通讯 message Pub { string topic = 1; string content = 2; @@ -36,7 +37,7 @@ message Deploy { string tar_url = 3; } -// 需要响应; 系统内部的调用采用: $sys_前缀 +// 需要响应, 云端主动发起的调用 message Invoke { string service_id = 1; string payload = 2;