From d12c973be92aa58078b10c6880fc7b3ffd94602f Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 24 Sep 2025 16:28:47 +0800 Subject: [PATCH] fix --- apps/efka/src/docker/docker_commands.erl | 8 ++-- .../src/docker/docker_container_helper.erl | 17 ++----- apps/efka/src/docker/docker_deployer.erl | 48 +++++++++++-------- apps/efka/src/docker/docker_http.erl | 3 +- apps/efka/src/docker/docker_manager.erl | 3 +- apps/efka/src/efka_remote_agent.erl | 1 + 6 files changed, 39 insertions(+), 41 deletions(-) diff --git a/apps/efka/src/docker/docker_commands.erl b/apps/efka/src/docker/docker_commands.erl index cd01bb4..202a1ee 100644 --- a/apps/efka/src/docker/docker_commands.erl +++ b/apps/efka/src/docker/docker_commands.erl @@ -105,7 +105,7 @@ test_create_container() -> }, create_container(<<"my_nginx_xx3">>, "/usr/local/code/efka/", M). --spec pull_image(Image :: binary(), Callback :: fun((Msg :: binary()) -> no_return())) -> ok | {error, ExitCode :: integer()}. +-spec pull_image(Image :: binary(), Callback :: fun((Msg :: any()) -> no_return())) -> ok | {error, ExitCode :: integer()}. pull_image(Image, Callback) when is_binary(Image), is_function(Callback, 1) -> Url = lists:flatten(io_lib:format("/images/create?fromImage=~s", [binary_to_list(Image)])), docker_http:stream_request(Callback, "POST", Url, undefined, []). @@ -117,7 +117,7 @@ check_image_exist(Image) when is_binary(Image) -> {ok, 200, _Headers, Resp} -> M = catch jiffy:decode(Resp, [return_maps]), is_map(M) andalso maps:is_key(<<"Id">>, M); - {ok, 404, _Headers, _Error} -> + {ok, _, _Headers, _Error} -> false end. @@ -125,11 +125,11 @@ check_image_exist(Image) when is_binary(Image) -> create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerName), is_list(ContainerDir), is_map(Config) -> Url = lists:flatten(io_lib:format("/containers/create?name=~s", [binary_to_list(ContainerName)])), %% 挂载预留的目录,用来作为配置文件的存放 - BinContainerDir = list_to_binary(docker_container_helper:make_etc_dir_name(ContainerDir)), + ConfigFile = list_to_binary(docker_container_helper:get_config_file(ContainerDir)), %% 增加自定义的用来放配置文件的目录 Volumes0 = maps:get(<<"volumes">>, Config, []), - Volumes = [<>|Volumes0], + Volumes = [<>|Volumes0], NewConfig = Config#{<<"volumes">> => Volumes}, Options = build_options(NewConfig), diff --git a/apps/efka/src/docker/docker_container_helper.erl b/apps/efka/src/docker/docker_container_helper.erl index 54e0a56..aa7caa7 100644 --- a/apps/efka/src/docker/docker_container_helper.erl +++ b/apps/efka/src/docker/docker_container_helper.erl @@ -10,7 +10,7 @@ -author("anlicheng"). %% API --export([ensure_dir/2, get_dir/2, ensure_etc_dir/1, make_etc_dir_name/1]). +-export([ensure_dir/2, get_dir/2, get_config_file/1]). -spec ensure_dir(RootDir :: string(), ContainerName :: binary()) -> {ok, ServerRootDir :: string()}. ensure_dir(RootDir, ContainerName) when is_list(RootDir), is_binary(ContainerName) -> @@ -19,12 +19,10 @@ ensure_dir(RootDir, ContainerName) when is_list(RootDir), is_binary(ContainerNam ok = filelib:ensure_dir(ContainerRootDir), {ok, ContainerRootDir}. --spec ensure_etc_dir(ContainerDir :: string()) -> {ok, EtcDir :: string()}. -ensure_etc_dir(ContainerDir) when is_list(ContainerDir) -> +-spec get_config_file(ContainerDir :: string()) -> {ok, EtcDir :: string()}. +get_config_file(ContainerDir) when is_list(ContainerDir) -> %% 根目录 - EtcDir = make_etc_dir_name(ContainerDir), - ok = filelib:ensure_dir(EtcDir), - {ok, EtcDir}. + {ok, ContainerDir ++ "/service.conf"}. -spec get_dir(RootDir :: string(), ContainerName :: binary()) -> {ok, ServerRootDir :: string()} | error. get_dir(RootDir, ContainerName) when is_list(RootDir), is_binary(ContainerName) -> @@ -35,9 +33,4 @@ get_dir(RootDir, ContainerName) when is_list(RootDir), is_binary(ContainerName) {ok, ContainerRootDir}; false -> error - end. - --spec make_etc_dir_name(ContainerDir :: string()) -> string(). -make_etc_dir_name(ContainerDir) when is_list(ContainerDir) -> - %% 根目录 - ContainerDir ++ "etc/". \ No newline at end of file + end. \ No newline at end of file diff --git a/apps/efka/src/docker/docker_deployer.erl b/apps/efka/src/docker/docker_deployer.erl index b953e03..c3c1085 100644 --- a/apps/efka/src/docker/docker_deployer.erl +++ b/apps/efka/src/docker/docker_deployer.erl @@ -177,30 +177,36 @@ do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Contain Image0 = maps:get(<<"image">>, Config), Image = normalize_image(Image0), efka_remote_agent:task_event_stream(TaskId, <<"使用镜像:"/utf8, Image/binary>>), - case docker_commands:check_image_exist(Image) of - true -> - efka_remote_agent:task_event_stream(TaskId, <<"镜像本地已存在:"/utf8, Image/binary>>), - efka_remote_agent:task_event_stream(TaskId, <<"任务完成"/utf8>>); - false -> - efka_remote_agent:task_event_stream(TaskId, <<"开始拉取镜像:"/utf8, Image/binary>>), - CB = fun(Msg) -> efka_remote_agent:task_event_stream(TaskId, Msg) end, - case docker_commands:pull_image(Image, CB) of - ok -> - efka_remote_agent:task_event_stream(TaskId, <<"开始创建容器: "/utf8, ContainerName/binary>>), - case docker_commands:create_container(ContainerName, ContainerDir, Config) of - {ok, ContainerId} -> - ShortContainerId = binary:part(ContainerId, 1, 12), - efka_remote_agent:task_event_stream(TaskId, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), - efka_remote_agent:task_event_stream(TaskId, <<"任务完成"/utf8>>); - {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, <<"容器创建失败: "/utf8, Reason/binary>>), - efka_remote_agent:task_event_stream(TaskId, <<"任务失败"/utf8>>) - end; + PullResult = case docker_commands:check_image_exist(Image) of + true -> + efka_remote_agent:task_event_stream(TaskId, <<"镜像本地已存在:"/utf8, Image/binary>>), + ok; + false -> + efka_remote_agent:task_event_stream(TaskId, <<"开始拉取镜像:"/utf8, Image/binary>>), + CB = fun + ({message, Msg}) -> + efka_remote_agent:task_event_stream(TaskId, Msg); + ({error, Error}) -> + efka_remote_agent:task_event_stream(TaskId, Error) + end, + docker_commands:pull_image(Image, CB) + end, + + case PullResult of + ok -> + efka_remote_agent:task_event_stream(TaskId, <<"开始创建容器: "/utf8, ContainerName/binary>>), + case docker_commands:create_container(ContainerName, ContainerDir, Config) of + {ok, ContainerId} -> + ShortContainerId = binary:part(ContainerId, 1, 12), + efka_remote_agent:task_event_stream(TaskId, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), + efka_remote_agent:task_event_stream(TaskId, <<"任务完成"/utf8>>); {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, <<"镜像拉取失败"/utf8>>), + efka_remote_agent:task_event_stream(TaskId, <<"容器创建失败: "/utf8, Reason/binary>>), efka_remote_agent:task_event_stream(TaskId, <<"任务失败"/utf8>>) - end + end; + {error, Reason} -> + efka_remote_agent:task_event_stream(TaskId, <<"镜像拉取失败: "/utf8, Reason/binary>>) end end. diff --git a/apps/efka/src/docker/docker_http.erl b/apps/efka/src/docker/docker_http.erl index a04e483..dd54154 100644 --- a/apps/efka/src/docker/docker_http.erl +++ b/apps/efka/src/docker/docker_http.erl @@ -67,8 +67,7 @@ stream_request(Callback, Method, Path, Body, Headers) when is_list(Method); is_b receive_response(Callback, ConnPid, StreamRef) -> receive - {gun_response, ConnPid, StreamRef, nofin, Status, Headers} -> - Callback({head, Status, Headers}), + {gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} -> receive_body(Callback, ConnPid, StreamRef); {gun_down, ConnPid, _, Reason, _} -> Callback({error, Reason}), diff --git a/apps/efka/src/docker/docker_manager.erl b/apps/efka/src/docker/docker_manager.erl index d08e07b..92b618c 100644 --- a/apps/efka/src/docker/docker_manager.erl +++ b/apps/efka/src/docker/docker_manager.erl @@ -91,8 +91,7 @@ handle_call({config_container, ContainerName, Config}, _From, State = #state{roo case docker_container_helper:get_dir(RootDir, ContainerName) of {ok, ContainerDir} -> %% 覆盖容器的配置文件 - {ok, EtcDir} = docker_container_helper:ensure_etc_dir(ContainerDir), - ConfigFile = EtcDir ++ "config", + {ok, ConfigFile} = docker_container_helper:get_config_file(ContainerDir), case file:write_file(ConfigFile, Config, [write, binary]) of ok -> lager:warning("[docker_manager] write config file: ~p success", [ConfigFile]), diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 8bbd275..583806f 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -125,6 +125,7 @@ handle_event(cast, {event, ServiceId, EventType, Params}, _, State) -> {keep_state, State}; handle_event(cast, {task_event_stream, TaskId, Stream}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + lager:debug("[efka_remote_agent] event_stream task_id: ~p, stream: ~ts", [TaskId, Stream]), EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{ task_id = TaskId, stream = Stream