This commit is contained in:
anlicheng 2025-05-09 15:41:13 +08:00
parent aca0dbc7d4
commit fe87dabbb8
4 changed files with 49 additions and 44 deletions

View File

@ -8,31 +8,38 @@
%%%-------------------------------------------------------------------
-author("anlicheng").
%% efka主动发起的消息体类型
%% efka主动发起的消息体类型,
-define(PACKET_REQUEST, 16#01).
-define(PACKET_RESPONSE, 16#02).
%%
-define(METHOD_AUTH, 16#00).
-define(METHOD_DATA, 16#02).
-define(METHOD_PING, 16#03).
-define(METHOD_INFORM, 16#04).
-define(METHOD_EVENT, 16#07).
-define(METHOD_PHASE, 16#09).
%% pub/sub的消息
%% pub/sub的消息,
-define(PACKET_PUB, 16#03).
%% push调用不需要返回
%% push调用不需要返回,
-define(PACKET_COMMAND, 16#04).
%%
-define(COMMAND_AUTH, 16#08).
%%
-define(PACKET_PUSH, 16#05).
-define(PACKET_PUSH_REPLY, 16#06).
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
-define(METHOD_AUTH, 16#01).
-define(METHOD_DATA, 16#02).
-define(METHOD_PING, 16#03).
-define(METHOD_INFORM, 16#04).
-define(METHOD_EVENT, 16#05).
-define(METHOD_PHASE, 16#06).
%%%% ,
%%
-define(COMMAND_AUTH, 16#08).
%%%% ,
%%
-define(PUSH_DEPLOY, 16#01).

View File

@ -275,7 +275,7 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, pay
case efka_service:get_pid(ServiceId) of
undefined ->
Reply = #push_reply{code = 0, message = <<"micro_service not run">>, result = <<>>},
safe_response(PacketId, message_pb:encode_msg(Reply), State);
safe_push_response(PacketId, message_pb:encode_msg(Reply), State);
ServicePid when is_pid(ServicePid) ->
Ref = make_ref(),
efka_service:invoke(ServicePid, Ref, Payload),
@ -288,7 +288,7 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, pay
{noreply, State};
%%
handle_info({command, ?COMMAND_AUTH, <<Auth:8>>}, State = #state{transport_pid = TransportPid, status = Status}) ->
handle_info({server_command, ?COMMAND_AUTH, <<Auth:8>>}, State = #state{transport_pid = TransportPid, status = Status}) ->
case {Auth, Status} of
{1, ?STATE_ACTIVATED} ->
{noreply, State};
@ -302,7 +302,7 @@ handle_info({command, ?COMMAND_AUTH, <<Auth:8>>}, State = #state{transport_pid =
end;
%%
handle_info({pub, Topic, Content}, State = #state{status = ?STATE_ACTIVATED}) ->
handle_info({server_pub, Topic, Content}, State = #state{status = ?STATE_ACTIVATED}) ->
efka_logger:debug("[efka_agent] get pub topic: ~p, content: ~p", [Topic, Content]),
%%
efka_subscription:publish(Topic, Content),
@ -321,7 +321,7 @@ handle_info({ems_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) ->
{error, Reason} ->
#push_reply{code = 0, message = Reason}
end,
safe_response(PacketId, message_pb:encode_msg(Reply), State),
safe_push_response(PacketId, message_pb:encode_msg(Reply), State),
{noreply, State#state{inflight = NInflight}}
end;
@ -333,7 +333,7 @@ handle_info({timeout, _, {request_timeout, Ref}}, State = #state{inflight = Infl
{noreply, State};
{PacketId, NInflight} ->
Reply = #push_reply{code = 0, message = <<"reqeust timeout">>, result = <<>>},
safe_response(PacketId, message_pb:encode_msg(Reply), State),
safe_push_response(PacketId, message_pb:encode_msg(Reply), State),
{noreply, State#state{inflight = NInflight}}
end;
@ -370,10 +370,10 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%===================================================================
%%
-spec safe_response(PacketId :: integer(), Reply :: binary(), State :: #state{}) -> no_return().
safe_response(PacketId, Reply, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_integer(PacketId), is_binary(Reply), is_pid(TransportPid) ->
is_process_alive(TransportPid) andalso efka_transport:async_response(TransportPid, PacketId, Reply);
safe_response(_PacketId, _Reply, #state{}) ->
-spec safe_push_response(PacketId :: integer(), Reply :: binary(), State :: #state{}) -> no_return().
safe_push_response(PacketId, Reply, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_integer(PacketId), is_binary(Reply), is_pid(TransportPid) ->
is_process_alive(TransportPid) andalso efka_transport:push_response(TransportPid, PacketId, Reply);
safe_push_response(_PacketId, _Reply, #state{}) ->
ok.
%%

View File

@ -15,7 +15,7 @@
%% API
-export([start_link/3]).
-export([connect/1, auth_request/2, send/3, async_response/3, stop/1]).
-export([connect/1, auth_request/2, send/3, push_response/3, stop/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -46,9 +46,9 @@ connect(Pid) when is_pid(Pid) ->
send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) ->
gen_server:cast(Pid, {send, Method, Packet}).
-spec async_response(Pid :: pid(), PacketId :: integer(), Response :: binary()) -> no_return().
async_response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) ->
gen_server:cast(Pid, {async_response, PacketId, Response}).
-spec push_response(Pid :: pid(), PacketId :: integer(), Response :: binary()) -> no_return().
push_response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) ->
gen_server:cast(Pid, {push_response, PacketId, Response}).
-spec stop(Pid :: pid()) -> ok.
stop(Pid) when is_pid(Pid) ->
@ -138,8 +138,9 @@ handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_REQUEST, 0:32, Method, Packet/binary>>),
{noreply, State};
handle_cast({async_response, PacketId, Response}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_ASYNC_RESPONSE, PacketId:32, Response/binary>>),
%% push的消息的回复
handle_cast({push_response, PacketId, Response}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_PUSH_REPLY, PacketId:32, Response/binary>>),
{noreply, State}.
%% @private
@ -150,31 +151,27 @@ handle_cast({async_response, PacketId, Response}, State = #state{socket = Socket
{stop, Reason :: term(), NewState :: #state{}}).
%% packetId的是要求返回的0
handle_info({ssl, Socket, <<?PACKET_COMMAND, 0:32, CommandType:8, Command/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
efka_logger:debug("[efka_agent] socket get command_type: ~p, command: ~p", [CommandType, Command]),
ParentPid ! {command, CommandType, Command},
ParentPid ! {server_command, CommandType, Command},
{noreply, State};
handle_info({ssl, Socket, <<?PACKET_PUB, 0:32, PubBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
#pub{topic = Topic, content = Content} = message_pb:decode_msg(PubBin, pub),
efka_logger:debug("[efka_agent] socket get pub: ~p", [PubBin]),
ParentPid ! {pub, Topic, Content},
ParentPid ! {server_pub, Topic, Content},
{noreply, State};
%% : <<CommandType:8, Command/binary>>, <<16:8, Directive/binary>>
handle_info({ssl, Socket, <<?PACKET_ASYNC_REQUEST, PacketId:32, RequestBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
AsyncRequest = message_pb:decode_msg(RequestBin, async_request),
efka_logger:debug("[efka_agent] socket get async request: ~p", [AsyncRequest]),
ParentPid ! {async_request, PacketId, AsyncRequest},
handle_info({ssl, Socket, <<?PACKET_PUSH, PacketId:32, PushBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
ParentPid ! {server_push, PacketId, PushBin},
{noreply, State};
handle_info({ssl_error, Socket, _Reason}, State = #state{socket = Socket}) ->
handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) ->
lager:debug("[efka_transport] ssl error: ~p", [Reason]),
{stop, normal, State};
handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) ->
{stop, normal, State};
handle_info(Info, State = #state{}) ->
efka_logger:debug("[efka_transport] info: ~p", [Info]),
lager:notice("[efka_transport] get unknown info: ~p", [Info]),
{noreply, State}.
%% @private
@ -185,7 +182,7 @@ handle_info(Info, State = #state{}) ->
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, #state{}) ->
efka_logger:debug("[efka_transport] terminate with reason: ~p", [Reason]),
efka_logger:notice("[efka_transport] terminate with reason: ~p", [Reason]),
ok.
%% @private

View File

@ -15,7 +15,8 @@ message AuthReply {
string message = 2;
}
// pub/sub的机制实现远程调用
// pub/sub的机制实现远程调用; service_id
// service_id主动订阅消息, 广
message Pub {
string topic = 1;
string content = 2;
@ -36,7 +37,7 @@ message Deploy {
string tar_url = 3;
}
// ; : $sys_前缀
// ,
message Invoke {
string service_id = 1;
string payload = 2;