diff --git a/apps/efka/src/docker/docker_commands.erl b/apps/efka/src/docker/docker_commands.erl index 5e330f2..ab9a198 100644 --- a/apps/efka/src/docker/docker_commands.erl +++ b/apps/efka/src/docker/docker_commands.erl @@ -12,6 +12,7 @@ %% API -export([pull_image/2, check_image_exist/1]). -export([create_container/3, check_container_exist/1, is_container_running/1, start_container/1, stop_container/1, remove_container/1, kill_container/1]). +-export([get_containers/0]). -spec pull_image(Image :: binary(), Callback :: fun((Msg :: any()) -> no_return())) -> ok | {error, Reason :: any()}. pull_image(Image, Callback) when is_binary(Image), is_function(Callback, 1) -> @@ -156,6 +157,25 @@ remove_container(ContainerName) when is_binary(ContainerName) -> end end. +-spec get_containers() -> {ok, Containers :: [map()]} | {error, Reason :: binary()}. +get_containers() -> + Url = "/containers/json?all=true", + Headers = [ + {<<"Content-Type">>, <<"application/json">>} + ], + case docker_http:request("GET", Url, undefined, Headers) of + {ok, 200, _Headers, ContainersBin} -> + Containers = jiffy:decode(ContainersBin, [return_maps]), + {ok, Containers}; + {ok, _StatusCode, _Header, ErrorResp} -> + case catch jiffy:decode(ErrorResp) of + #{<<"message">> := Msg} -> + {error, Msg}; + _ -> + {error, ErrorResp} + end + end. + -spec inspect_container(ContainerId :: binary()) -> {ok, Json :: map()} | {error, Error :: any()}. inspect_container(ContainerId) when is_binary(ContainerId) -> Url = lists:flatten(io_lib:format("/containers/~s/json", [binary_to_list(ContainerId)])), diff --git a/apps/efka/src/docker/docker_manager.erl b/apps/efka/src/docker/docker_manager.erl index 8c68c49..8120430 100644 --- a/apps/efka/src/docker/docker_manager.erl +++ b/apps/efka/src/docker/docker_manager.erl @@ -16,6 +16,7 @@ %% API -export([start_link/0]). -export([deploy/2, start_container/1, stop_container/1, config_container/2, kill_container/1, remove_container/1]). +-export([get_containers/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,6 +33,10 @@ %%% API %%%=================================================================== +-spec get_containers() -> {ok, Containers :: [map()]} | {error, Reason :: binary()}. +get_containers() -> + gen_server:call(?SERVER, get_containers). + -spec deploy(TaskId :: integer(), Config :: map()) -> ok | {error, Reason :: binary()}. deploy(TaskId, Config) when is_integer(TaskId), is_map(Config) -> gen_server:call(?SERVER, {deploy, TaskId, Config}). @@ -140,6 +145,15 @@ handle_call({kill_container, ContainerId}, _From, State = #state{}) -> {reply, {error, Reason}, State} end; +%% 停止服务, 主动停止的时候会改变服务配置的status字段 +handle_call(get_containers, _From, State = #state{}) -> + case docker_commands:get_containers() of + {ok, Containers} -> + {reply, {ok, Containers}, State}; + {error, Reason} -> + {reply, {error, Reason}, State} + end; + %% 停止服务, 主动停止的时候会改变服务配置的status字段 handle_call({remove_container, ContainerId}, _From, State = #state{}) -> case docker_commands:remove_container(ContainerId) of diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 2397c28..e56e8a3 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -239,6 +239,17 @@ handle_event(info, flush_cache, _, State) -> %% 云端服务器推送了消息 %% 激活消息 +%% 微服务部署 +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{method = <<"get_containers">>}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + %% 短暂的等待,efka_inetd收到消息后就立即返回了 + case docker_manager:get_containers() of + {ok, Containers} -> + efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Containers)); + {error, Reason} when is_binary(Reason) -> + efka_transport:rpc_reply(TransportPid, PacketId, reply_error(-1, Reason)) + end, + {keep_state, State}; + %% 微服务部署 handle_event(info, {server_rpc, PacketId, #jsonrpc_request{method = <<"deploy">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> #{<<"task_id">> := TaskId, <<"config">> := Config} = Params, @@ -393,7 +404,7 @@ auth_request() -> }). -spec reply_success(Result :: any()) -> binary(). -reply_success(Result) when is_binary(Result) -> +reply_success(Result) -> message_codec:encode(?MESSAGE_JSONRPC_REPLY, #jsonrpc_reply{result = Result}). -spec reply_error(Code :: integer(), Message :: binary()) -> binary(). diff --git a/apps/efka/src/message/message_codec.erl b/apps/efka/src/message/message_codec.erl index 065eeca..e98f7f3 100644 --- a/apps/efka/src/message/message_codec.erl +++ b/apps/efka/src/message/message_codec.erl @@ -37,7 +37,7 @@ encode0(#jsonrpc_reply{result = Result, error = undefined}) -> ResultBin = jiffy:encode(#{<<"result">> => Result}, [force_utf8]), iolist_to_binary([marshal(?Bytes, ResultBin)]); encode0(#jsonrpc_reply{result = undefined, error = Error}) -> - ResultBin = jiffy:encode(#{<<"error">> => Error}, [force_utf8]), + ResultBin = iolist_to_binary(jiffy:encode(#{<<"error">> => Error}, [force_utf8])), iolist_to_binary([marshal(?Bytes, ResultBin)]); encode0(#pub{topic = Topic, content = Content}) -> iolist_to_binary([ @@ -51,7 +51,7 @@ encode0(#command{command_type = CommandType, command = Command}) -> ]); encode0(#jsonrpc_request{method = Method, params = Params}) -> - ReqBody = jiffy:encode(#{<<"method">> => Method, <<"params">> => Params}, [force_utf8]), + ReqBody = iolist_to_binary(jiffy:encode(#{<<"method">> => Method, <<"params">> => Params}, [force_utf8])), marshal(?Bytes, ReqBody); encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) -> iolist_to_binary([