fix efka
This commit is contained in:
parent
5b3e39c31b
commit
38b46874d3
@ -9,8 +9,8 @@
|
|||||||
-author("anlicheng").
|
-author("anlicheng").
|
||||||
|
|
||||||
%% efka主动发起的消息体类型, 消息大类
|
%% efka主动发起的消息体类型, 消息大类
|
||||||
-define(PACKET_RPC, 16#01).
|
-define(PACKET_REQUEST, 16#01).
|
||||||
-define(PACKET_RPC_REPLY, 16#02).
|
-define(PACKET_RESPONSE, 16#02).
|
||||||
|
|
||||||
%% efka主动发起不需要返回的数据
|
%% efka主动发起不需要返回的数据
|
||||||
-define(PACKET_CAST, 16#03).
|
-define(PACKET_CAST, 16#03).
|
||||||
@ -21,13 +21,15 @@
|
|||||||
|
|
||||||
%% 主机端上报数据类型标识
|
%% 主机端上报数据类型标识
|
||||||
-define(MESSAGE_AUTH_REQUEST, 16#01).
|
-define(MESSAGE_AUTH_REQUEST, 16#01).
|
||||||
-define(MESSAGE_AUTH_REPLY, 16#02).
|
-define(MESSAGE_PUB, 16#02).
|
||||||
-define(MESSAGE_PUB, 16#03).
|
-define(MESSAGE_COMMAND, 16#03).
|
||||||
-define(MESSAGE_COMMAND, 16#04).
|
-define(MESSAGE_RPC_DEPLOY, 16#04).
|
||||||
-define(MESSAGE_RPC_DEPLOY, 16#05).
|
-define(MESSAGE_RPC_CONTAINER, 16#05).
|
||||||
-define(MESSAGE_RPC_CONTAINER, 16#06).
|
-define(MESSAGE_DATA, 16#06).
|
||||||
-define(MESSAGE_DATA, 16#07).
|
-define(MESSAGE_EVENT, 16#07).
|
||||||
-define(MESSAGE_EVENT, 16#08).
|
|
||||||
|
%% 响应数据
|
||||||
|
-define(MESSAGE_RPC_REPLY, 16#FF).
|
||||||
|
|
||||||
%%%% 命令类型子分类, 不需要返回值
|
%%%% 命令类型子分类, 不需要返回值
|
||||||
%% 授权
|
%% 授权
|
||||||
@ -41,11 +43,6 @@
|
|||||||
timestamp :: integer()
|
timestamp :: integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-record(auth_reply, {
|
|
||||||
code :: integer(),
|
|
||||||
message :: binary()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-record(pub, {
|
-record(pub, {
|
||||||
topic :: binary(),
|
topic :: binary(),
|
||||||
content :: binary()
|
content :: binary()
|
||||||
@ -67,6 +64,11 @@
|
|||||||
params = <<>> :: binary()
|
params = <<>> :: binary()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-record(rpc_reply, {
|
||||||
|
code :: integer(),
|
||||||
|
payload :: binary()
|
||||||
|
}).
|
||||||
|
|
||||||
-record(data, {
|
-record(data, {
|
||||||
service_id :: binary(),
|
service_id :: binary(),
|
||||||
device_uuid :: binary(),
|
device_uuid :: binary(),
|
||||||
|
|||||||
@ -165,7 +165,7 @@ handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{tra
|
|||||||
|
|
||||||
handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) ->
|
handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) ->
|
||||||
case Reply of
|
case Reply of
|
||||||
{ok, #auth_reply{code = Code, message = Message}} ->
|
{ok, #rpc_reply{code = Code, payload = Message}} ->
|
||||||
case Code of
|
case Code of
|
||||||
0 ->
|
0 ->
|
||||||
lager:debug("[efka_remote_agent] auth success, message: ~p", [Message]),
|
lager:debug("[efka_remote_agent] auth success, message: ~p", [Message]),
|
||||||
@ -342,8 +342,8 @@ auth_request() ->
|
|||||||
|
|
||||||
-spec reply_success(Result :: binary()) -> binary().
|
-spec reply_success(Result :: binary()) -> binary().
|
||||||
reply_success(Result) when is_binary(Result) ->
|
reply_success(Result) when is_binary(Result) ->
|
||||||
<<1:8, Result/binary>>.
|
message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Result}).
|
||||||
|
|
||||||
-spec reply_error(Message :: binary()) -> binary().
|
-spec reply_error(Message :: binary()) -> binary().
|
||||||
reply_error(Message) when is_binary(Message) ->
|
reply_error(Message) when is_binary(Message) ->
|
||||||
<<0:8, Message/binary>>.
|
message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = Message}).
|
||||||
@ -73,14 +73,14 @@ init([]) ->
|
|||||||
modules => ['docker_manager']
|
modules => ['docker_manager']
|
||||||
}
|
}
|
||||||
|
|
||||||
%#{
|
#{
|
||||||
% id => 'efka_remote_agent',
|
id => 'efka_remote_agent',
|
||||||
% start => {'efka_remote_agent', start_link, []},
|
start => {'efka_remote_agent', start_link, []},
|
||||||
% restart => permanent,
|
restart => permanent,
|
||||||
% shutdown => 2000,
|
shutdown => 2000,
|
||||||
% type => worker,
|
type => worker,
|
||||||
% modules => ['efka_remote_agent']
|
modules => ['efka_remote_agent']
|
||||||
%},
|
}
|
||||||
|
|
||||||
],
|
],
|
||||||
|
|
||||||
|
|||||||
@ -47,8 +47,8 @@ send(Pid, Packet) when is_pid(Pid), is_binary(Packet) ->
|
|||||||
-spec rpc_reply(Pid :: pid() | undefined, PacketId :: integer(), Response :: binary()) -> no_return().
|
-spec rpc_reply(Pid :: pid() | undefined, PacketId :: integer(), Response :: binary()) -> no_return().
|
||||||
rpc_reply(undefined, PacketId, Response) when is_integer(PacketId), is_binary(Response) ->
|
rpc_reply(undefined, PacketId, Response) when is_integer(PacketId), is_binary(Response) ->
|
||||||
ok;
|
ok;
|
||||||
rpc_reply(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) ->
|
rpc_reply(Pid, PacketId, Reply) when is_pid(Pid), is_integer(PacketId), is_binary(Reply) ->
|
||||||
gen_server:cast(Pid, {rpc_reply, PacketId, Response}).
|
gen_server:cast(Pid, {rpc_reply, PacketId, Reply}).
|
||||||
|
|
||||||
%% 关闭的时候不一定能成功,可能关闭的时候;transport进程已经退出了
|
%% 关闭的时候不一定能成功,可能关闭的时候;transport进程已经退出了
|
||||||
-spec stop(Pid :: pid() | undefined) -> ok.
|
-spec stop(Pid :: pid() | undefined) -> ok.
|
||||||
@ -114,11 +114,11 @@ handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = Paren
|
|||||||
%% auth校验
|
%% auth校验
|
||||||
handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket}) ->
|
handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket}) ->
|
||||||
PacketId = 1,
|
PacketId = 1,
|
||||||
ok = ssl:send(Socket, <<?PACKET_RPC, PacketId:32, AuthRequestBin/binary>>),
|
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, AuthRequestBin/binary>>),
|
||||||
%% 需要等待auth返回的结果
|
%% 需要等待auth返回的结果
|
||||||
receive
|
receive
|
||||||
{ssl, Socket, <<?PACKET_RPC_REPLY, PacketId:32, ReplyBin/binary>>} ->
|
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>} ->
|
||||||
{ok, #auth_reply{} = Reply} = message_codec:decode(ReplyBin),
|
{ok, #rpc_reply{} = Reply} = message_codec:decode(ReplyBin),
|
||||||
ParentPid ! {auth_reply, {ok, Reply}},
|
ParentPid ! {auth_reply, {ok, Reply}},
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
{ssl, Socket, Info} ->
|
{ssl, Socket, Info} ->
|
||||||
@ -135,8 +135,8 @@ handle_cast({send, Packet}, State = #state{socket = Socket}) ->
|
|||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% 服务push的消息的回复
|
%% 服务push的消息的回复
|
||||||
handle_cast({rpc_reply, PacketId, Response}, State = #state{socket = Socket}) ->
|
handle_cast({rpc_reply, PacketId, Reply}, State = #state{socket = Socket}) ->
|
||||||
ok = ssl:send(Socket, <<?PACKET_RPC_REPLY, PacketId:32, Response/binary>>),
|
ok = ssl:send(Socket, <<?PACKET_RESPONSE, PacketId:32, Reply/binary>>),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -151,7 +151,7 @@ handle_info({ssl, Socket, <<?PACKET_CAST, CommandBin/binary>>}, State = #state{s
|
|||||||
ParentPid ! {server_cast, CastRequest},
|
ParentPid ! {server_cast, CastRequest},
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({ssl, Socket, <<?PACKET_RPC, PacketId:32, RPCRequestBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
|
handle_info({ssl, Socket, <<?PACKET_REQUEST, PacketId:32, RPCRequestBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
|
||||||
{ok, RPCRequest} = message_codec:decode(RPCRequestBin),
|
{ok, RPCRequest} = message_codec:decode(RPCRequestBin),
|
||||||
ParentPid ! {server_rpc, PacketId, RPCRequest},
|
ParentPid ! {server_rpc, PacketId, RPCRequest},
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|||||||
@ -16,10 +16,10 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([encode/2, decode/1]).
|
-export([encode/2, decode/1]).
|
||||||
|
|
||||||
-spec encode0(Message :: any()) -> binary().
|
-spec encode(MessageType :: integer(), Message :: any()) -> binary().
|
||||||
encode(PacketType, Packet) when is_integer(PacketType) ->
|
encode(MessageType, Message) when is_integer(MessageType) ->
|
||||||
Bin = encode0(Packet),
|
Bin = encode0(Message),
|
||||||
<<PacketType, Bin/binary>>.
|
<<MessageType, Bin/binary>>.
|
||||||
encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}) ->
|
encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}) ->
|
||||||
iolist_to_binary([
|
iolist_to_binary([
|
||||||
marshal(?Bytes, UUID),
|
marshal(?Bytes, UUID),
|
||||||
@ -28,7 +28,7 @@ encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Tok
|
|||||||
marshal(?Bytes, Token),
|
marshal(?Bytes, Token),
|
||||||
marshal(?I32, Timestamp)
|
marshal(?I32, Timestamp)
|
||||||
]);
|
]);
|
||||||
encode0(#auth_reply{code = Code, message = Message}) ->
|
encode0(#rpc_reply{code = Code, payload = Message}) ->
|
||||||
iolist_to_binary([
|
iolist_to_binary([
|
||||||
marshal(?I32, Code),
|
marshal(?I32, Code),
|
||||||
marshal(?Bytes, Message)
|
marshal(?Bytes, Message)
|
||||||
@ -78,8 +78,8 @@ decode(<<PacketType:8, Packet/binary>>) ->
|
|||||||
end.
|
end.
|
||||||
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
|
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
|
||||||
{ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}};
|
{ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}};
|
||||||
decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) ->
|
decode0(?MESSAGE_RPC_REPLY, [Code, Message]) ->
|
||||||
{ok, #auth_reply{code = Code, message = Message}};
|
{ok, #rpc_reply{code = Code, payload = Message}};
|
||||||
decode0(?MESSAGE_PUB, [Topic, Content]) ->
|
decode0(?MESSAGE_PUB, [Topic, Content]) ->
|
||||||
{ok, #pub{topic = Topic, content = Content}};
|
{ok, #pub{topic = Topic, content = Content}};
|
||||||
decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
|
decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
|
||||||
Loading…
x
Reference in New Issue
Block a user