diff --git a/apps/efka/src/docker/docker_commands.erl b/apps/efka/src/docker/docker_commands.erl index 58909ab..186dff5 100644 --- a/apps/efka/src/docker/docker_commands.erl +++ b/apps/efka/src/docker/docker_commands.erl @@ -43,7 +43,7 @@ create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerNa NewConfig = Config#{<<"volumes">> => Volumes}, Options = build_options(ContainerName, NewConfig), - lists:foreach(fun({K, V}) -> lager:debug("~p => ~p", [K, V]) end, maps:to_list(Options)), + display_options(Options), Body = iolist_to_binary(jiffy:encode(Options, [force_utf8])), Headers = [ @@ -59,7 +59,9 @@ create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerNa {error, Msg}; _ -> {error, ErrorResp} - end + end; + {error, Reason} -> + {error, Reason} end. -spec is_container_running(ContainerId :: binary()) -> boolean(). @@ -434,4 +436,8 @@ build_extra_hosts(Config) -> #{}; _ -> #{<<"ExtraHosts">> => Hosts} - end. \ No newline at end of file + end. + +-spec display_options(Options :: map()) -> no_return(). +display_options(Options) when is_map(Options) -> + lists:foreach(fun({K, V}) -> lager:debug("~p => ~p", [K, V]) end, maps:to_list(Options)). diff --git a/apps/efka/src/docker/docker_deployer.erl b/apps/efka/src/docker/docker_deployer.erl index e1672c2..520ee3a 100644 --- a/apps/efka/src/docker/docker_deployer.erl +++ b/apps/efka/src/docker/docker_deployer.erl @@ -41,52 +41,36 @@ start_monitor(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Con deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> %% 尝试拉取镜像 ContainerName = maps:get(<<"container_name">>, Config), + log(TaskId, <<"info">>, <<"开始部署容器:"/utf8, ContainerName/binary>>), - Msg = <<"开始部署容器:"/utf8, ContainerName/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg), - efka_logger:write(format_log(TaskId, <<"info">>, Msg)), - + ContainerName = <<>>, case docker_commands:check_container_exist(ContainerName) of true -> - Msg1 = <<"本地容器已经存在:"/utf8, ContainerName/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg1), - efka_remote_agent:close_task_event_stream(TaskId), - efka_logger:write(format_log(TaskId, <<"info">>, Msg1)); + log(TaskId, <<"info">>, <<"本地容器已经存在:"/utf8, ContainerName/binary>>), + efka_remote_agent:close_task_event_stream(TaskId); false -> Image0 = maps:get(<<"image">>, Config), Image = normalize_image(Image0), - Msg2 = <<"使用镜像:"/utf8, Image/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg2), - efka_logger:write(format_log(TaskId, <<"info">>, Msg2)), - + log(TaskId, <<"info">>, <<"使用镜像:"/utf8, Image/binary>>), PullResult = case docker_commands:check_image_exist(Image) of true -> - Msg3 = <<"镜像本地已存在:"/utf8, Image/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg3), - efka_logger:write(format_log(TaskId, <<"info">>, Msg3)), + log(TaskId, <<"info">>, <<"镜像本地已存在:"/utf8, Image/binary>>), ok; false -> - Msg4 = <<"开始拉取镜像:"/utf8, Image/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg4), - efka_logger:write(format_log(TaskId, <<"info">>, Msg4)), + log(TaskId, <<"info">>, <<"开始拉取镜像:"/utf8, Image/binary>>), CB = fun ({message, M}) -> - efka_remote_agent:task_event_stream(TaskId, <<"info">>, M), - efka_logger:write(format_log(TaskId, <<"info">>, M)); + log(TaskId, <<"info">>, M); ({error, Error}) -> - efka_remote_agent:task_event_stream(TaskId, <<"error">>, Error), - efka_logger:write(format_log(TaskId, <<"error">>, Error)) + log(TaskId, <<"error">>, Error) end, docker_commands:pull_image(Image, CB) end, case PullResult of ok -> - Msg5 = <<"开始创建容器: "/utf8, ContainerName/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg5), - efka_logger:write(format_log(TaskId, <<"info">>, Msg5)), - + log(TaskId, <<"info">>, <<"开始创建容器: "/utf8, ContainerName/binary>>), case docker_commands:create_container(ContainerName, ContainerDir, Config) of {ok, ContainerId} -> %% 创建容器对应的配置文件 @@ -97,37 +81,19 @@ deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerD file:close(FD); {error, Reason} -> Reason1 = list_to_binary(io_lib:format("~p", [Reason])), - Msg6 = <<"创建配置文件失败: "/utf8, Reason1/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"notice">>, Msg6), - efka_logger:write(format_log(TaskId, <<"notice">>, Msg6)) + log(TaskId, <<"notice">>, <<"创建配置文件失败: "/utf8, Reason1/binary>>) end, ShortContainerId = binary:part(ContainerId, 1, 12), - - Msg7 = <<"容器创建成功: "/utf8, ShortContainerId/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg7), - efka_logger:write(format_log(TaskId, <<"info">>, Msg7)), - - Msg8 = <<"任务完成"/utf8>>, - efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg8), - efka_logger:write(format_log(TaskId, <<"info">>, Msg8)), - + log(TaskId, <<"info">>, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), + log(TaskId, <<"info">>, <<"任务完成"/utf8>>), efka_remote_agent:close_task_event_stream(TaskId); {error, Reason} -> - Msg9 = <<"容器创建失败: "/utf8, Reason/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg9), - efka_logger:write(format_log(TaskId, <<"error">>, Msg9)), - - Msg10 = <<"任务失败"/utf8>>, - efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg10), - efka_logger:write(format_log(TaskId, <<"error">>, Msg10)), - + log(TaskId, <<"error">>, <<"容器创建失败: "/utf8, Reason/binary>>), + log(TaskId, <<"error">>, <<"任务失败"/utf8>>), efka_remote_agent:close_task_event_stream(TaskId) end; {error, Reason} -> - Msg11 = <<"镜像拉取失败: "/utf8, Reason/binary>>, - efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg11), - efka_logger:write(format_log(TaskId, <<"error">>, Msg11)), - + log(TaskId, <<"error">>, <<"镜像拉取失败: "/utf8, Reason/binary>>), efka_remote_agent:close_task_event_stream(TaskId) end end. @@ -142,6 +108,8 @@ normalize_image(Image) when is_binary(Image) -> end, iolist_to_binary(lists:join(<<"/">>, PrefixParts ++ [NormalizedLast])). --spec format_log(Label :: binary(), TaskId :: integer(), Msg :: binary()) -> binary(). -format_log(TaskId, Label, Msg) when is_binary(Label), is_binary(Msg) -> - iolist_to_binary([<<"task_id=">>, integer_to_binary(TaskId), <<" ">>, Label, <<" ">>, Msg]). \ No newline at end of file +-spec log(TaskId :: integer(), Level :: binary(), Msg :: binary()) -> no_return(). +log(TaskId, Level, Msg) when is_integer(TaskId), is_binary(Level), is_binary(Msg) -> + efka_remote_agent:task_event_stream(TaskId, Level, Msg), + Info = iolist_to_binary([<<"task_id=">>, integer_to_binary(TaskId), <<" ">>, Level, <<" ">>, Msg]), + efka_logger:write(Info). \ No newline at end of file diff --git a/apps/efka/src/docker/docker_http.erl b/apps/efka/src/docker/docker_http.erl index e3183c3..ecc049c 100644 --- a/apps/efka/src/docker/docker_http.erl +++ b/apps/efka/src/docker/docker_http.erl @@ -3,21 +3,16 @@ -export([request/4, stream_request/5]). %% 通过 Unix Socket 调用 Docker API --spec request(Method :: iolist(), Path :: string(), Body :: binary() | undefined, Headers :: list()) -> +-spec request(Method :: string(), Path :: string(), Body :: binary() | undefined, Headers :: list()) -> {ok, StatusCode :: integer(), RespHeaders :: proplists:proplist(), RespBody :: binary()} | {error, any()}. -request(Method, Path, Body, Headers) when is_list(Method); is_binary(Method), is_list(Path), is_binary(Body), is_list(Headers) -> +request(Method, Path, Body, Headers) when is_list(Method), is_list(Path), is_binary(Body), is_list(Headers) -> SocketPath = "/var/run/docker.sock", %% 使用 gun:open/2 + {local, Path} 方式 case gun:open_unix(SocketPath, #{}) of {ok, ConnPid} -> {ok, http} = gun:await_up(ConnPid), - %% 如果 Body 是 undefined,就用 <<>> 代替 - BodyBin = case Body of - undefined -> <<>>; - B when is_binary(B) -> B - end, %% 发送 HTTP 请求 - StreamRef = gun:request(ConnPid, Method, Path, Headers, BodyBin), + StreamRef = gun:request(ConnPid, Method, Path, Headers, Body), receive_response(ConnPid, StreamRef); {error, Reason} -> {error, Reason} @@ -32,7 +27,7 @@ receive_response(ConnPid, StreamRef) -> {gun_down, ConnPid, _, Reason, _} -> {error, {http_closed, Reason}} after 5000 -> - {error, timeout11} + {error, timeout} end. receive_body(ConnPid, StreamRef, Status, Headers, Acc) -> receive diff --git a/apps/efka/src/efka_logger.erl b/apps/efka/src/efka_logger.erl index b6a4291..db3b69a 100644 --- a/apps/efka/src/efka_logger.erl +++ b/apps/efka/src/efka_logger.erl @@ -29,7 +29,8 @@ %%% API %%%=================================================================== -write(Data) -> +-spec write(Data :: binary()) -> no_return(). +write(Data) when is_binary(Data) -> gen_server:cast(?SERVER, {write, Data}). write_lines(Lines) when is_list(Lines) -> diff --git a/apps/efka/src/ws_channel.erl b/apps/efka/src/ws_channel.erl index 19d972d..2eec84b 100644 --- a/apps/efka/src/ws_channel.erl +++ b/apps/efka/src/ws_channel.erl @@ -18,7 +18,7 @@ -define(PENDING_TIMEOUT, 10 * 1000). -record(state, { - service_id :: binary(), + service_id :: undefined | binary(), service_pid :: undefined | pid(), is_registered = false :: boolean() }).