diff --git a/apps/efka/src/docker/docker_uds_commands.erl b/apps/efka/src/docker/docker_uds_commands.erl new file mode 100644 index 0000000..8db5b0b --- /dev/null +++ b/apps/efka/src/docker/docker_uds_commands.erl @@ -0,0 +1,340 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 15. 9月 2025 16:11 +%%%------------------------------------------------------------------- +-module(docker_uds_commands). +-author("anlicheng"). + +%% API +-export([pull_image/2, check_image_exist/1]). +-export([create_container/3, check_container_exist/1, is_container_running/1, start_container/1, stop_container/1]). + +-spec pull_image(Image :: binary(), Callback :: fun((Msg :: binary()) -> no_return())) -> ok | {error, ExitCode :: integer()}. +pull_image(Image, Callback) when is_binary(Image), is_function(Callback, 1) -> + PortSettings = [stream, exit_status, use_stdio, stderr_to_stdout, binary], + ExecCmd = "docker pull " ++ binary_to_list(Image), + lager:debug("cmd : ~p", [ExecCmd]), + case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of + Port when is_port(Port) -> + case gather_pull_output(Port, Callback) of + 0 -> + ok; + ExitCode -> + {error, ExitCode} + end; + Error -> + lager:debug("error: ~p", [Error]), + {error, -1} + end. + +-spec check_image_exist(Image :: binary()) -> boolean(). +check_image_exist(Image) when is_binary(Image) -> + Url = io_lib:format("/images/~s/json", [binary_to_list(Image)]), + case docker_http:request("GET", Url, undefined, []) of + {ok, 200, _Headers, Resp} -> + M = catch jiffy:decode(Resp, [return_maps]), + is_map(M) andalso maps:is_key(<<"Id">>, M); + {ok, 404, _Headers, _Error} -> + false + end. + +-spec create_container(ContainerName :: binary(), ContainerDir :: string(), Config :: map()) -> {ok, ContainerId :: binary()} | {error, Reason :: any()}. +create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerName), is_list(ContainerDir), is_map(Config) -> + Image = maps:get(<<"image">>, Config), + Cmd = maps:get(<<"command">>, Config, []), + + %% 挂载预留的目录,用来作为配置文件的存放 + BinContainerDir = list_to_binary(docker_container_helper:make_etc_dir_name(ContainerDir)), + BaseOptions = [<<"-v">>, <>], + + Options = build_options(Config), + Args = lists:flatten([Image | BaseOptions ++ Options ++ Cmd]), + CreateArgs = iolist_to_binary(lists:join(<<" ">>, Args)), + + PortSettings = [stream, exit_status, stderr_to_stdout, use_stdio, binary], + ExecCmd = "docker create --name " ++ binary_to_list(ContainerName) ++ " " ++ binary_to_list(CreateArgs), + lager:debug("create_container cmd : ~p", [ExecCmd]), + + case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of + Port when is_port(Port) -> + case gather_output(Port) of + {0, ContainerId} -> + {ok, string:trim(ContainerId)}; + {ExitCode, Error} -> + {error, {ExitCode, Error}} + end; + Error -> + lager:debug("error: ~p", [Error]), + {error, <<"exec command startup failed">>} + end. + +-spec is_container_running(ContainerId :: binary()) -> boolean(). +is_container_running(ContainerId) when is_binary(ContainerId) -> + PortSettings = [stream, exit_status, use_stdio, stderr_to_stdout, binary], + ExecCmd = "docker inspect -f '{{.State.Running}}' " ++ binary_to_list(ContainerId), + case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of + Port when is_port(Port) -> + case gather_output(Port) of + {0, Val0} -> + Val = string:trim(Val0), + Val =:= <<"true">>; + _ -> + false + end; + _Error -> + false + end. + +-spec check_container_exist(ContainerName :: binary()) -> boolean(). +check_container_exist(ContainerName) when is_binary(ContainerName) -> + PortSettings = [stream, exit_status, use_stdio, binary], + ExecCmd = "docker inspect --type=container " ++ binary_to_list(ContainerName) ++ " >/dev/null 2>&1", + lager:debug("check_container_exist cmd : ~p", [ExecCmd]), + case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of + Port when is_port(Port) -> + case gather_output(Port) of + {0, _} -> + true; + {_ExitCode, _Error} -> + false + end; + _Error -> + false + end. + +-spec start_container(ContainerName :: binary()) -> ok | {error, Reason :: binary()}. +start_container(ContainerName) when is_binary(ContainerName) -> + PortSettings = [stream, exit_status, use_stdio, binary], + ExecCmd = "docker start " ++ binary_to_list(ContainerName) ++ " 2>&1", + case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of + Port when is_port(Port) -> + case gather_output(Port) of + {0, _} -> + ok; + {_ExitCode, Error} -> + {error, Error} + end; + _Error -> + {error, <<"启动失败"/utf8>>} + end. + +-spec stop_container(ContainerName :: binary()) -> ok | {error, Reason :: binary()}. +stop_container(ContainerName) when is_binary(ContainerName) -> + PortSettings = [stream, exit_status, use_stdio, binary], + ExecCmd = "docker stop " ++ binary_to_list(ContainerName) ++ " 2>&1", + lager:debug("[docker_commands] cmd : ~p", [ExecCmd]), + case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of + Port when is_port(Port) -> + case gather_output(Port) of + {0, _} -> + ok; + {_ExitCode, Error} -> + {error, Error} + end; + _Error -> + false + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +gather_output(Port) -> + gather_output(Port, <<>>). +gather_output(Port, Acc) -> + receive + {Port, {data, Data}} -> + gather_output(Port, [Acc, Data]); + {Port, {exit_status, Status}} -> + {Status, iolist_to_binary(Acc)} + end. + +-spec gather_pull_output(Port :: port(), Callback :: fun((Msg :: binary()) -> no_return())) -> ExitCode :: integer(). +gather_pull_output(Port, Callback) -> + receive + {Port, {data, Data}} -> + Callback(Data), + gather_pull_output(Port, Callback); + {Port, {exit_status, Status}} -> + Status + end. + +%% 构建所有参数 +build_options(Config) -> + lists:flatten([ + build_entrypoint(Config), + build_ports(Config), + build_expose(Config), + build_volumes(Config), + build_env(Config), + build_env_file(Config), + build_networks(Config), + build_labels(Config), + build_restart(Config), + build_user(Config), + build_working_dir(Config), + build_hostname(Config), + build_privileged(Config), + build_cap_add_drop(Config), + build_devices(Config), + build_memory(Config), + build_cpu(Config), + build_ulimits(Config), + build_sysctls(Config), + build_tmpfs(Config), + build_extra_hosts(Config), + build_healthcheck(Config) + ]). + +build_entrypoint(Config) -> + case maps:get(<<"entrypoint">>, Config, []) of + [] -> []; + EP -> [<<"--entrypoint">> | EP] + end. + +build_ports(Config) -> + Ports = maps:get(<<"ports">>, Config, []), + lists:map(fun(P) -> [<<"-p">>, P] end, Ports). + +build_expose(Config) -> + Ports = maps:get(<<"expose">>, Config, []), + lists:map(fun(P) -> [<<"--expose">>, P] end, Ports). + +build_volumes(Config) -> + Vols = maps:get(<<"volumes">>, Config, []), + lists:map(fun(V) -> [<<"-v">>, V] end, Vols). + +build_env(Config) -> + Envs = maps:get(<<"envs">>, Config, []), + lists:map(fun(E) -> [<<"-e">>, E] end, Envs). + +build_env_file(Config) -> + Files = maps:get(<<"env_file">>, Config, []), + lists:map(fun(F) -> [<<"--env-file">>, F] end, Files). + +build_networks(Config) -> + Nets = maps:get(<<"networks">>, Config, []), + lists:map(fun(Net) -> [<<"--network">>, Net] end, Nets). + +build_labels(Config) -> + case maps:get(<<"labels">>, Config, #{}) of + #{} -> + []; + Labels -> + lists:map(fun({K, V}) -> [<<"--label">>, <>, Config, undefined) of + undefined -> []; + Policy -> [<<"--restart">>, Policy] + end. + +build_user(Config) -> + case maps:get(<<"user">>, Config, undefined) of + undefined -> []; + U -> [<<"--user">>, U] + end. + +build_working_dir(Config) -> + case maps:get(<<"working_dir">>, Config, undefined) of + undefined -> []; + D -> [<<"--workdir">>, D] + end. + +build_hostname(Config) -> + case maps:get(<<"hostname">>, Config, undefined) of + undefined -> []; + H -> [<<"--hostname">>, H] + end. + +build_privileged(Config) -> + case maps:get(<<"privileged">>, Config, false) of + true -> [<<"--privileged">>]; + _ -> [] + end. + +build_cap_add_drop(Config) -> + Add = maps:get(<<"cap_add">>, Config, []), + Drop = maps:get(<<"cap_drop">>, Config, []), + lists:map(fun(C) -> [<<"--cap-add">>, C] end, Add) ++ lists:map(fun(C0) -> [<<"--cap-drop">>, C0] end, Drop). + +build_devices(Config) -> + Devs = maps:get(<<"devices">>, Config, []), + lists:map(fun(D) -> [<<"--device">>, D] end, Devs). + +build_memory(Config) -> + Mem = maps:get(<<"mem_limit">>, Config, undefined), + MemRes = maps:get(<<"mem_reservation">>, Config, undefined), + Res1 = if Mem /= undefined -> [<<"--memory">>, Mem]; true -> [] end, + Res2 = if MemRes /= undefined -> [<<"--memory-reservation">>, MemRes]; true -> [] end, + Res1 ++ Res2. + +build_cpu(Config) -> + CPU = maps:get(<<"cpus">>, Config, undefined), + Shares = maps:get(<<"cpu_shares">>, Config, undefined), + Res1 = if + CPU /= undefined -> + Bin = iolist_to_binary(io_lib:format("~p", [CPU])), + [<<"--cpus">>, Bin]; + true -> + [] + end, + Res2 = if + Shares /= undefined -> + Bin1 = iolist_to_binary(io_lib:format("~p", [Shares])), + [<<"--cpu-shares">>, Bin1]; + true -> + [] + end, + Res1 ++ Res2. + +build_ulimits(Config) -> + UL = maps:get(<<"ulimits">>, Config, #{}), + lists:map(fun({K, V}) -> [<<"--ulimit">>, <>, Config, #{}), + lists:map(fun({K, V}) -> [<<"--sysctl ">>, <>, Config, []), + lists:map(fun(T) -> [<<"--tmpfs">>, T] end, Tmp). + +build_extra_hosts(Config) -> + Hosts = maps:get(<<"extra_hosts">>, Config, []), + lists:map(fun(H) -> [<<"--add-host">>, H] end, Hosts). + +build_healthcheck(Config) -> + HC = maps:get(<<"healthcheck">>, Config, #{}), + lists:map(fun({K, V}) -> + case K of + <<"test">> -> + case V of + %% Test 是 ["CMD-SHELL", Cmd] + [<<"CMD-SHELL">>, Cmd] -> + [<<"--health-cmd">>, <<$", Cmd/binary, $">>]; + %% Test 是 ["CMD", Arg1, Arg2...] + [<<"CMD">> | CmdList] -> + CmdArgs = iolist_to_binary(lists:join(<<" ">>, CmdList)), + [<<"--health-cmd">>, <<$", CmdArgs/binary, $">>]; + %% Test 是 <<"NONE">> + [<<"NONE">>] -> + [<<"--no-healthcheck">>]; + _ -> + [] + end; + <<"interval">> -> + [<<"--health-interval">>, V]; + <<"timeout">> -> + [<<"--health-timeout">>, V]; + <<"retries">> -> + [<<"--health-retries">>, io_lib:format("~p", [V])]; + _ -> + [] + end + end, maps:to_list(HC)).