diff --git a/apps/efka/include/efka.hrl b/apps/efka/include/efka.hrl index 5befe01..142ac02 100644 --- a/apps/efka/include/efka.hrl +++ b/apps/efka/include/efka.hrl @@ -33,10 +33,14 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 主机端上报数据类型标识 --define(METHOD_AUTH, 16#01). --define(METHOD_DATA, 16#02). --define(METHOD_EVENT, 16#03). --define(METHOD_PING, 16#04). +-define(MESSAGE_AUTH_REQUEST, 16#01). +-define(MESSAGE_AUTH_REPLY, 16#02). +-define(MESSAGE_PUB, 16#03). +-define(MESSAGE_COMMAND, 16#04). +-define(MESSAGE_RPC_DEPLOY, 16#05). +-define(MESSAGE_RPC_CONTAINER, 16#06). +-define(MESSAGE_DATA, 16#07). +-define(MESSAGE_EVENT, 16#08). %%%% 命令类型子分类, 不需要返回值 %% 授权 diff --git a/apps/efka/src/efka_codec.erl b/apps/efka/src/efka_codec.erl index 63be82c..0e0760d 100644 --- a/apps/efka/src/efka_codec.erl +++ b/apps/efka/src/efka_codec.erl @@ -72,21 +72,21 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) decode(<>) -> Fields = unmarshal(Packet), decode0(PacketType, Fields). -decode0(?I32, [UUID, Username, Salt, Token, Timestamp]) -> +decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) -> #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}; -decode0(?I32, [Code, Message]) -> +decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) -> #auth_reply{code = Code, message = Message}; -decode0(?I32, [Topic, Content]) -> +decode0(?MESSAGE_PUB, [Topic, Content]) -> #pub{topic = Topic, content = Content}; -decode0(?I32, [CommandType, Command]) -> +decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> #command{command_type = CommandType, command = Command}; -decode0(?I32, [TaskId, Config]) -> +decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) -> #rpc_deploy{task_id = TaskId, config = Config}; -decode0(?I32, [Method, ContainerName, Params]) -> +decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) -> #rpc_container{method = Method, container_name = ContainerName, params = Params}; -decode0(?I32, [ServiceId, DeviceUUID, RouteKey, Metric]) -> +decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}; -decode0(?I32, [ServiceId, EventType, Params]) -> +decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> #event{service_id = ServiceId, event_type = EventType, params = Params}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index e9b51da..8cbe910 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -83,7 +83,7 @@ callback_mode() -> %% 异步发送数据, 连接存在时候直接发送;否则缓存到mnesia handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> - Packet = efka_codec:encode(?METHOD_DATA, #data{ + Packet = efka_codec:encode(?MESSAGE_DATA, #data{ service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, @@ -93,7 +93,7 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STAT {keep_state, State}; handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, State) -> - Packet = efka_codec:encode(?METHOD_DATA, #data{ + Packet = efka_codec:encode(?MESSAGE_DATA, #data{ service_id = ServiceId, device_uuid = DeviceUUID, metric = LineProtocolData @@ -103,7 +103,7 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, St %% 异步发送事件 handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> - EventPacket = efka_codec:encode(?METHOD_EVENT, #event{ + EventPacket = efka_codec:encode(?MESSAGE_EVENT, #event{ service_id = ServiceId, event_type = EventType, params = Params @@ -111,12 +111,12 @@ handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, Stat efka_transport:send(TransportPid, EventPacket), {keep_state, State}; handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State) -> - EventPacket = efka_codec:encode(?METHOD_EVENT, #event{ + EventPacket = efka_codec:encode(?MESSAGE_EVENT, #event{ service_id = ServiceId, event_type = EventType, params = Params }), - ok = cache_model:insert(?METHOD_EVENT, EventPacket), + ok = cache_model:insert(EventPacket), {keep_state, State}; %handle_event(cast, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}, ?STATE_ACTIVATED, @@ -164,8 +164,7 @@ handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{tra handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) -> case Reply of - {ok, ReplyBin} -> - #auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply), + {ok, #auth_reply{code = Code, message = Message}} -> case Code of 0 -> lager:debug("[efka_remote_agent] auth success, message: ~p", [Message]), @@ -209,9 +208,7 @@ handle_event(info, flush_cache, _, State) -> %% 激活消息 %% 微服务部署 -handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> - #deploy{task_id = TaskId, config = Config0} = message_pb:decode_msg(DeployBin, deploy), - +handle_event(info, {server_rpc, PacketId, #rpc_deploy{task_id = TaskId, config = Config0}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> case catch jiffy:decode(Config0, [return_maps]) of Config when is_map(Config) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 @@ -227,9 +224,9 @@ handle_event(info, {server_rpc, PacketId, <>}, {keep_state, State}; %% 启动微服务 -handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"start">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 - case docker_manager:start_container(ServiceId) of + case docker_manager:start_container(ContainerName) of ok -> efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>)); {error, Reason} when is_binary(Reason) -> @@ -238,9 +235,9 @@ handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"stop">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 - case docker_manager:stop_container(ServiceId) of + case docker_manager:stop_container(ContainerName) of ok -> efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>)); {error, Reason} when is_binary(Reason) -> @@ -249,9 +246,7 @@ handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> - #container_config{container_name = ContainerName, config = Config} = message_pb:decode_msg(ConfigBin, push_service_config), - +handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"config">>, container_name = ContainerName, params = Config}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> case docker_manager:config_container(ContainerName, Config) of ok -> efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>)); @@ -277,7 +272,7 @@ handle_event(info, {server_rpc, PacketId, <>}, StateName, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_command, #command{command_type = ?COMMAND_AUTH, command = Auth}}, StateName, State = #state{transport_pid = TransportPid}) -> case {Auth, StateName} of {1, ?STATE_ACTIVATED} -> {keep_state, State}; @@ -335,7 +330,7 @@ auth_request() -> Salt = proplists:get_value(salt, AuthInfo), Token = proplists:get_value(token, AuthInfo), - efka_codec:encode(?METHOD_AUTH, #auth_request{ + efka_codec:encode(?MESSAGE_AUTH_REQUEST, #auth_request{ uuid = unicode:characters_to_binary(UUID), username = unicode:characters_to_binary(Username), salt = unicode:characters_to_binary(Salt), diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 9397593..5d90b63 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -14,7 +14,7 @@ %% API -export([start_monitor/3]). --export([connect/1, auth_request/2, send/3, rpc_reply/3, stop/1]). +-export([connect/1, auth_request/2, send/2, rpc_reply/3, stop/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -40,9 +40,9 @@ auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) -> connect(Pid) when is_pid(Pid) -> gen_server:cast(Pid, connect). --spec send(Pid :: pid(), Method :: integer(), Packet :: binary()) -> no_return(). -send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) -> - gen_server:cast(Pid, {send, Method, Packet}). +-spec send(Pid :: pid(), Packet :: binary()) -> no_return(). +send(Pid, Packet) when is_pid(Pid), is_binary(Packet) -> + gen_server:cast(Pid, {send, Packet}). -spec rpc_reply(Pid :: pid() | undefined, PacketId :: integer(), Response :: binary()) -> no_return(). rpc_reply(undefined, PacketId, Response) when is_integer(PacketId), is_binary(Response) -> @@ -118,7 +118,8 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi %% 需要等待auth返回的结果 receive {ssl, Socket, <>} -> - ParentPid ! {auth_reply, {ok, ReplyBin}}, + Reply = efka_codec:decode(ReplyBin), + ParentPid ! {auth_reply, {ok, Reply}}, {noreply, State}; {ssl, Socket, Info} -> lager:warning("[efka_transport] get invalid auth_reply: ~p", [Info]), @@ -129,8 +130,8 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi {noreply, State} end; -handle_cast({send, Method, Packet}, State = #state{socket = Socket}) -> - ok = ssl:send(Socket, <>), +handle_cast({send, Packet}, State = #state{socket = Socket}) -> + ok = ssl:send(Socket, <>), {noreply, State}; %% 服务push的消息的回复 @@ -145,16 +146,18 @@ handle_cast({rpc_reply, PacketId, Response}, State = #state{socket = Socket}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 服务器主动推送的数据,有packetId的是要求返回的;为0的表示不需要返回值 -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - ParentPid ! {server_command, CommandType, Command}, +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> + Command = efka_codec:decode(CommandBin), + ParentPid ! {server_command, Command}, {noreply, State}; handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - #pub{topic = Topic, content = Content} = message_pb:decode_msg(PubBin, pub), - ParentPid ! {server_pub, Topic, Content}, + Pub = efka_codec:decode(PubBin), + ParentPid ! {server_pub, Pub}, {noreply, State}; -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> + RPCRequest = efka_codec:decode(RPCRequestBin), ParentPid ! {server_rpc, PacketId, RPCRequest}, {noreply, State};