From a7d565b38f77bfa676f929dc01841cba70361bf3 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 22 Sep 2025 19:15:26 +0800 Subject: [PATCH] fix docker commands --- apps/efka/src/docker/docker_commands.erl | 44 +++++++++++++----------- apps/efka/src/docker/docker_deployer.erl | 20 +++++++---- apps/efka/src/message/message_codec.erl | 4 +-- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/apps/efka/src/docker/docker_commands.erl b/apps/efka/src/docker/docker_commands.erl index 177ce44..7f87dc5 100644 --- a/apps/efka/src/docker/docker_commands.erl +++ b/apps/efka/src/docker/docker_commands.erl @@ -10,28 +10,25 @@ -author("anlicheng"). %% API --export([pull_image/1, check_image_exist/1]). +-export([pull_image/2, check_image_exist/1]). -export([create_container/3, check_container_exist/1, is_container_running/1, start_container/1, stop_container/1]). --spec pull_image(Image :: binary()) -> {ok, Output :: binary()} | {error, Reason :: any()}. -pull_image(Image) when is_binary(Image) -> - %% todo 重定向错误流 {stderr_to_stdout, true} - PortSettings = [stream, exit_status, use_stdio, binary], +-spec pull_image(Image :: binary(), Callback :: fun((Msg :: binary()) -> no_return())) -> ok | {error, ExitCode :: integer()}. +pull_image(Image, Callback) when is_binary(Image), is_function(Callback, 1) -> + PortSettings = [stream, exit_status, use_stdio, stderr_to_stdout, binary], ExecCmd = "docker pull " ++ binary_to_list(Image), lager:debug("cmd : ~p", [ExecCmd]), case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of Port when is_port(Port) -> - case gather_output(Port) of - {0, Output} -> - lager:debug("docker pull output: ~p", [Output]), - {ok, Output}; - {ExitCode, Error} -> - lager:debug("call me here: ~p", [Error]), - {error, {ExitCode, Error}} + case gather_pull_output(Port, Callback) of + 0 -> + ok; + ExitCode -> + {error, ExitCode} end; Error -> lager:debug("error: ~p", [Error]), - {error, <<"exec command startup failed">>} + {error, -1} end. -spec check_image_exist(Image :: binary()) -> boolean(). @@ -66,17 +63,15 @@ create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerNa Args = lists:flatten([Image | BaseOptions ++ Options ++ Cmd]), CreateArgs = iolist_to_binary(lists:join(<<" ">>, Args)), - %% todo 重定向错误流 {stderr_to_stdout, true} - PortSettings = [stream, exit_status, use_stdio, binary], + PortSettings = [stream, exit_status, stderr_to_stdout, use_stdio, binary], ExecCmd = "docker create --name " ++ binary_to_list(ContainerName) ++ " " ++ binary_to_list(CreateArgs), lager:debug("create_container cmd : ~p", [ExecCmd]), case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of Port when is_port(Port) -> case gather_output(Port) of - {0, <>} -> - {ok, ContainerId}; + {0, ContainerId} -> + {ok, string:trim(ContainerId)}; {ExitCode, Error} -> - lager:debug("call me here: ~p", [Error]), {error, {ExitCode, Error}} end; Error -> @@ -86,8 +81,7 @@ create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerNa -spec is_container_running(ContainerId :: binary()) -> boolean(). is_container_running(ContainerId) when is_binary(ContainerId) -> - %% todo 重定向错误流 {stderr_to_stdout, true} - PortSettings = [stream, exit_status, use_stdio, binary], + PortSettings = [stream, exit_status, use_stdio, stderr_to_stdout, binary], ExecCmd = "docker inspect -f '{{.State.Running}}' " ++ binary_to_list(ContainerId), case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of Port when is_port(Port) -> @@ -166,6 +160,16 @@ gather_output(Port, Acc) -> {Status, iolist_to_binary(Acc)} end. +-spec gather_pull_output(Port :: port(), Callback :: fun((Msg :: binary()) -> no_return())) -> ExitCode :: integer(). +gather_pull_output(Port, Callback) -> + receive + {Port, {data, Data}} -> + Callback(Data), + gather_pull_output(Port, Callback); + {Port, {exit_status, Status}} -> + Status + end. + %% 构建所有参数 build_options(Config) -> lists:flatten([ diff --git a/apps/efka/src/docker/docker_deployer.erl b/apps/efka/src/docker/docker_deployer.erl index 9ee9f2f..b953e03 100644 --- a/apps/efka/src/docker/docker_deployer.erl +++ b/apps/efka/src/docker/docker_deployer.erl @@ -179,21 +179,27 @@ do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Contain efka_remote_agent:task_event_stream(TaskId, <<"使用镜像:"/utf8, Image/binary>>), case docker_commands:check_image_exist(Image) of true -> - efka_remote_agent:task_event_stream(TaskId, <<"镜像本地已存在:"/utf8, Image/binary>>); + efka_remote_agent:task_event_stream(TaskId, <<"镜像本地已存在:"/utf8, Image/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"任务完成"/utf8>>); false -> efka_remote_agent:task_event_stream(TaskId, <<"开始拉取镜像:"/utf8, Image/binary>>), - case docker_commands:pull_image(Image) of - {ok, Output} -> - efka_remote_agent:task_event_stream(TaskId, Output), + + CB = fun(Msg) -> efka_remote_agent:task_event_stream(TaskId, Msg) end, + case docker_commands:pull_image(Image, CB) of + ok -> efka_remote_agent:task_event_stream(TaskId, <<"开始创建容器: "/utf8, ContainerName/binary>>), case docker_commands:create_container(ContainerName, ContainerDir, Config) of {ok, ContainerId} -> - efka_remote_agent:task_event_stream(TaskId, <<"容器创建成功: "/utf8, ContainerId/binary>>); + ShortContainerId = binary:part(ContainerId, 1, 12), + efka_remote_agent:task_event_stream(TaskId, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"任务完成"/utf8>>); {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, <<"容器创建失败: "/utf8, Reason/binary>>) + efka_remote_agent:task_event_stream(TaskId, <<"容器创建失败: "/utf8, Reason/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"任务失败"/utf8>>) end; {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, <<"镜像拉取失败:"/utf8, Reason/binary>>) + efka_remote_agent:task_event_stream(TaskId, <<"镜像拉取失败"/utf8>>), + efka_remote_agent:task_event_stream(TaskId, <<"任务失败"/utf8>>) end end end. diff --git a/apps/efka/src/message/message_codec.erl b/apps/efka/src/message/message_codec.erl index b8aa2bd..ddb5078 100644 --- a/apps/efka/src/message/message_codec.erl +++ b/apps/efka/src/message/message_codec.erl @@ -69,8 +69,8 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) ]); encode0(#task_event_stream{task_id = TaskId, stream = Stream}) -> iolist_to_binary([ - marshal(?Bytes, TaskId), - marshal(?I32, Stream) + marshal(?I32, TaskId), + marshal(?Bytes, Stream) ]). -spec decode(Bin :: binary()) -> {ok, Message :: any()} | error.