This commit is contained in:
anlicheng 2025-09-17 17:41:28 +08:00
parent 08aaf01016
commit 0bb34c50bb
4 changed files with 45 additions and 43 deletions

View File

@ -33,10 +33,14 @@
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
-define(METHOD_AUTH, 16#01).
-define(METHOD_DATA, 16#02).
-define(METHOD_EVENT, 16#03).
-define(METHOD_PING, 16#04).
-define(MESSAGE_AUTH_REQUEST, 16#01).
-define(MESSAGE_AUTH_REPLY, 16#02).
-define(MESSAGE_PUB, 16#03).
-define(MESSAGE_COMMAND, 16#04).
-define(MESSAGE_RPC_DEPLOY, 16#05).
-define(MESSAGE_RPC_CONTAINER, 16#06).
-define(MESSAGE_DATA, 16#07).
-define(MESSAGE_EVENT, 16#08).
%%%% ,
%%

View File

@ -72,21 +72,21 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params})
decode(<<PacketType:8, Packet/binary>>) ->
Fields = unmarshal(Packet),
decode0(PacketType, Fields).
decode0(?I32, [UUID, Username, Salt, Token, Timestamp]) ->
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp};
decode0(?I32, [Code, Message]) ->
decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) ->
#auth_reply{code = Code, message = Message};
decode0(?I32, [Topic, Content]) ->
decode0(?MESSAGE_PUB, [Topic, Content]) ->
#pub{topic = Topic, content = Content};
decode0(?I32, [CommandType, Command]) ->
decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
#command{command_type = CommandType, command = Command};
decode0(?I32, [TaskId, Config]) ->
decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) ->
#rpc_deploy{task_id = TaskId, config = Config};
decode0(?I32, [Method, ContainerName, Params]) ->
decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) ->
#rpc_container{method = Method, container_name = ContainerName, params = Params};
decode0(?I32, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric};
decode0(?I32, [ServiceId, EventType, Params]) ->
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
#event{service_id = ServiceId, event_type = EventType, params = Params}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -83,7 +83,7 @@ callback_mode() ->
%% , mnesia
handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
Packet = efka_codec:encode(?METHOD_DATA, #data{
Packet = efka_codec:encode(?MESSAGE_DATA, #data{
service_id = ServiceId,
device_uuid = DeviceUUID,
route_key = RouteKey,
@ -93,7 +93,7 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STAT
{keep_state, State};
handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, State) ->
Packet = efka_codec:encode(?METHOD_DATA, #data{
Packet = efka_codec:encode(?MESSAGE_DATA, #data{
service_id = ServiceId,
device_uuid = DeviceUUID,
metric = LineProtocolData
@ -103,7 +103,7 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, St
%%
handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
EventPacket = efka_codec:encode(?METHOD_EVENT, #event{
EventPacket = efka_codec:encode(?MESSAGE_EVENT, #event{
service_id = ServiceId,
event_type = EventType,
params = Params
@ -111,12 +111,12 @@ handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, Stat
efka_transport:send(TransportPid, EventPacket),
{keep_state, State};
handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State) ->
EventPacket = efka_codec:encode(?METHOD_EVENT, #event{
EventPacket = efka_codec:encode(?MESSAGE_EVENT, #event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
ok = cache_model:insert(?METHOD_EVENT, EventPacket),
ok = cache_model:insert(EventPacket),
{keep_state, State};
%handle_event(cast, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}, ?STATE_ACTIVATED,
@ -164,8 +164,7 @@ handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{tra
handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) ->
case Reply of
{ok, ReplyBin} ->
#auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply),
{ok, #auth_reply{code = Code, message = Message}} ->
case Code of
0 ->
lager:debug("[efka_remote_agent] auth success, message: ~p", [Message]),
@ -209,9 +208,7 @@ handle_event(info, flush_cache, _, State) ->
%%
%%
handle_event(info, {server_rpc, PacketId, <<?RPC_DEPLOY:8, DeployBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
#deploy{task_id = TaskId, config = Config0} = message_pb:decode_msg(DeployBin, deploy),
handle_event(info, {server_rpc, PacketId, #rpc_deploy{task_id = TaskId, config = Config0}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
case catch jiffy:decode(Config0, [return_maps]) of
Config when is_map(Config) ->
%% efka_inetd收到消息后就立即返回了
@ -227,9 +224,9 @@ handle_event(info, {server_rpc, PacketId, <<?RPC_DEPLOY:8, DeployBin/binary>>},
{keep_state, State};
%%
handle_event(info, {server_rpc, PacketId, <<?RPC_START_CONTAINER:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"start">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了
case docker_manager:start_container(ServiceId) of
case docker_manager:start_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) ->
@ -238,9 +235,9 @@ handle_event(info, {server_rpc, PacketId, <<?RPC_START_CONTAINER:8, ServiceId/bi
{keep_state, State};
%%
handle_event(info, {server_rpc, PacketId, <<?RPC_STOP_CONTAINER:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"stop">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了
case docker_manager:stop_container(ServiceId) of
case docker_manager:stop_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) ->
@ -249,9 +246,7 @@ handle_event(info, {server_rpc, PacketId, <<?RPC_STOP_CONTAINER:8, ServiceId/bin
{keep_state, State};
%% config.json配置信息
handle_event(info, {server_rpc, PacketId, <<?RPC_CONFIG_CONTAINER:8, ConfigBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
#container_config{container_name = ContainerName, config = Config} = message_pb:decode_msg(ConfigBin, push_service_config),
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"config">>, container_name = ContainerName, params = Config}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
case docker_manager:config_container(ContainerName, Config) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
@ -277,7 +272,7 @@ handle_event(info, {server_rpc, PacketId, <<?RPC_CONFIG_CONTAINER:8, ConfigBin/b
% {keep_state, State};
%%
handle_event(info, {server_command, ?COMMAND_AUTH, <<Auth:8>>}, StateName, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_command, #command{command_type = ?COMMAND_AUTH, command = Auth}}, StateName, State = #state{transport_pid = TransportPid}) ->
case {Auth, StateName} of
{1, ?STATE_ACTIVATED} ->
{keep_state, State};
@ -335,7 +330,7 @@ auth_request() ->
Salt = proplists:get_value(salt, AuthInfo),
Token = proplists:get_value(token, AuthInfo),
efka_codec:encode(?METHOD_AUTH, #auth_request{
efka_codec:encode(?MESSAGE_AUTH_REQUEST, #auth_request{
uuid = unicode:characters_to_binary(UUID),
username = unicode:characters_to_binary(Username),
salt = unicode:characters_to_binary(Salt),

View File

@ -14,7 +14,7 @@
%% API
-export([start_monitor/3]).
-export([connect/1, auth_request/2, send/3, rpc_reply/3, stop/1]).
-export([connect/1, auth_request/2, send/2, rpc_reply/3, stop/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -40,9 +40,9 @@ auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) ->
connect(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, connect).
-spec send(Pid :: pid(), Method :: integer(), Packet :: binary()) -> no_return().
send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) ->
gen_server:cast(Pid, {send, Method, Packet}).
-spec send(Pid :: pid(), Packet :: binary()) -> no_return().
send(Pid, Packet) when is_pid(Pid), is_binary(Packet) ->
gen_server:cast(Pid, {send, Packet}).
-spec rpc_reply(Pid :: pid() | undefined, PacketId :: integer(), Response :: binary()) -> no_return().
rpc_reply(undefined, PacketId, Response) when is_integer(PacketId), is_binary(Response) ->
@ -118,7 +118,8 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi
%% auth返回的结果
receive
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>} ->
ParentPid ! {auth_reply, {ok, ReplyBin}},
Reply = efka_codec:decode(ReplyBin),
ParentPid ! {auth_reply, {ok, Reply}},
{noreply, State};
{ssl, Socket, Info} ->
lager:warning("[efka_transport] get invalid auth_reply: ~p", [Info]),
@ -129,8 +130,8 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi
{noreply, State}
end;
handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_CAST, Method:8, Packet/binary>>),
handle_cast({send, Packet}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_CAST, Packet/binary>>),
{noreply, State};
%% push的消息的回复
@ -145,16 +146,18 @@ handle_cast({rpc_reply, PacketId, Response}, State = #state{socket = Socket}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% packetId的是要求返回的0
handle_info({ssl, Socket, <<?PACKET_COMMAND, CommandType:8, Command/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
ParentPid ! {server_command, CommandType, Command},
handle_info({ssl, Socket, <<?PACKET_COMMAND, CommandBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
Command = efka_codec:decode(CommandBin),
ParentPid ! {server_command, Command},
{noreply, State};
handle_info({ssl, Socket, <<?PACKET_PUB, PubBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
#pub{topic = Topic, content = Content} = message_pb:decode_msg(PubBin, pub),
ParentPid ! {server_pub, Topic, Content},
Pub = efka_codec:decode(PubBin),
ParentPid ! {server_pub, Pub},
{noreply, State};
handle_info({ssl, Socket, <<?PACKET_RPC, PacketId:32, RPCRequest/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
handle_info({ssl, Socket, <<?PACKET_RPC, PacketId:32, RPCRequestBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
RPCRequest = efka_codec:decode(RPCRequestBin),
ParentPid ! {server_rpc, PacketId, RPCRequest},
{noreply, State};