diff --git a/apps/efka/include/message.hrl b/apps/efka/include/message.hrl index a08c533..97c9f34 100644 --- a/apps/efka/include/message.hrl +++ b/apps/efka/include/message.hrl @@ -66,7 +66,7 @@ -record(jsonrpc_request, { id = 0 :: integer(), method :: binary(), - params = <<>> :: binary() + params = <<>> :: any() }). -record(jsonrpc_reply, { diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index f55a684..bf696c5 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -240,23 +240,20 @@ handle_event(info, flush_cache, _, State) -> %% 激活消息 %% 微服务部署 -handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"deploy">>, params = #{<<"task_id">> := TaskId, <<"config">> := Config0}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> - case catch jiffy:decode(Config0, [return_maps]) of - Config when is_map(Config) -> - %% 短暂的等待,efka_inetd收到消息后就立即返回了 - case docker_manager:deploy(TaskId, Config) of - ok -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); - {error, Reason} when is_binary(Reason) -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) - end; - _Error -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, <<"invalid config json">>)) +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"deploy">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + #{<<"task_id">> := TaskId, <<"config">> := Config} = Params, + %% 短暂的等待,efka_inetd收到消息后就立即返回了 + case docker_manager:deploy(TaskId, Config) of + ok -> + efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); + {error, Reason} when is_binary(Reason) -> + efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) end, {keep_state, State}; %% 启动微服务 -handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"start_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"start_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + #{<<"container_name">> := ContainerName} = Params, %% 短暂的等待,efka_inetd收到消息后就立即返回了 case docker_manager:start_container(ContainerName) of ok -> @@ -267,7 +264,8 @@ handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<" {keep_state, State}; %% 停止微服务 -handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"stop_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"stop_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + #{<<"container_name">> := ContainerName} = Params, %% 短暂的等待,efka_inetd收到消息后就立即返回了 case docker_manager:stop_container(ContainerName) of ok -> @@ -277,7 +275,8 @@ handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<" end, {keep_state, State}; -handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"kill_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"kill_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + #{<<"container_name">> := ContainerName} = Params, %% 短暂的等待,efka_inetd收到消息后就立即返回了 case docker_manager:kill_container(ContainerName) of ok -> @@ -287,7 +286,8 @@ handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<" end, {keep_state, State}; -handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"remove_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"remove_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + #{<<"container_name">> := ContainerName} = Params, %% 短暂的等待,efka_inetd收到消息后就立即返回了 case docker_manager:remove_container(ContainerName) of ok -> @@ -298,7 +298,8 @@ handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<" {keep_state, State}; %% config.json配置信息 -handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"config_container">>, params = #{<<"container_name">> := ContainerName, <<"config">> := Config}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"config_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + #{<<"container_name">> := ContainerName, <<"config">> := Config} = Params, case docker_manager:config_container(ContainerName, Config) of ok -> efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); diff --git a/apps/efka/src/message/message_codec.erl b/apps/efka/src/message/message_codec.erl index a7decc0..0c29766 100644 --- a/apps/efka/src/message/message_codec.erl +++ b/apps/efka/src/message/message_codec.erl @@ -34,10 +34,10 @@ encode0(#auth_reply{code = Code, payload = Payload}) -> marshal(?Bytes, Payload) ]); encode0(#jsonrpc_reply{id = Id, result = Result, error = undefined}) -> - ResultBin = jiffy:encode(#{<<"id">> => Id, <<"result">> => Result}, [return_maps]), + ResultBin = jiffy:encode(#{<<"id">> => Id, <<"result">> => Result}, [force_utf8]), iolist_to_binary([marshal(?Bytes, ResultBin)]); encode0(#jsonrpc_reply{id = Id, result = undefined, error = Error}) -> - ResultBin = jiffy:encode(#{<<"id">> => Id, <<"error">> => Error}, [return_maps]), + ResultBin = jiffy:encode(#{<<"id">> => Id, <<"error">> => Error}, [force_utf8]), iolist_to_binary([marshal(?Bytes, ResultBin)]); encode0(#pub{topic = Topic, content = Content}) -> iolist_to_binary([ @@ -51,8 +51,8 @@ encode0(#command{command_type = CommandType, command = Command}) -> ]); encode0(#jsonrpc_request{id = Id, method = Method, params = Params}) -> - ReqBody = jiffy:decode(#{<<"id">> => Id, <<"method">> => Method, <<"params">> => Params}, [force_utf8]), - iolist_to_binary([marshal(?Bytes, ReqBody)]); + ReqBody = jiffy:encode(#{<<"id">> => Id, <<"method">> => Method, <<"params">> => Params}, [force_utf8]), + marshal(?Bytes, ReqBody); encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) -> iolist_to_binary([ marshal(?Bytes, ServiceId),