diff --git a/apps/efka/src/docker/docker_http.erl b/apps/efka/src/docker/docker_http.erl index 77efc89..2e2b9a8 100644 --- a/apps/efka/src/docker/docker_http.erl +++ b/apps/efka/src/docker/docker_http.erl @@ -1,6 +1,6 @@ %%% docker_http.erl -module(docker_http). --export([request/4]). +-export([request/4, stream_request/5]). %% 通过 Unix Socket 调用 Docker API -spec request(Method :: iolist(), Path :: string(), Body :: binary() | undefined, Headers :: list()) -> @@ -41,4 +41,46 @@ receive_body(ConnPid, StreamRef, Status, Headers, Acc) -> receive_body(ConnPid, StreamRef, Status, Headers, NewAcc) after 10000 -> {error, timeout22} + end. + +%% 通过 Unix Socket 调用 Docker API +-spec stream_request(Callback :: any(), Method :: iolist(), Path :: string(), Body :: binary() | undefined, Headers :: list()) -> no_return(). +stream_request(Callback, Method, Path, Body, Headers) when is_list(Method); is_binary(Method), is_list(Path), is_binary(Body), is_list(Headers) -> + SocketPath = "/var/run/docker.sock", + case gun:open_unix(SocketPath, #{}) of + {ok, ConnPid} -> + {ok, http} = gun:await_up(ConnPid), + %% 如果 Body 是 undefined,就用 <<>> 代替 + BodyBin = case Body of + undefined -> <<>>; + B when is_binary(B) -> B + end, + %% 发送 HTTP 请求 + StreamRef = gun:request(ConnPid, Method, Path, Headers, BodyBin), + receive_response(Callback, ConnPid, StreamRef); + {error, Reason} -> + Callback({error, Reason}), + {error, Reason} + end. + +receive_response(Callback, ConnPid, StreamRef) -> + receive + {gun_response, ConnPid, StreamRef, nofin, Status, Headers} -> + Callback({head, Status, Headers}), + receive_body(Callback, ConnPid, StreamRef); + {gun_down, ConnPid, _, Reason, _} -> + Callback({error, Reason}), + {error, Reason} + after 5000 -> + Callback({error, <<"处理超时"/utf8>>}), + {error, timeout} + end. +receive_body(Callback, ConnPid, StreamRef) -> + receive + {gun_data, ConnPid, StreamRef, fin, Data} -> + Callback({message, Data}), + ok; + {gun_data, ConnPid, StreamRef, nofin, Data} -> + Callback({message, Data}), + receive_body(Callback, ConnPid, StreamRef) end. \ No newline at end of file diff --git a/apps/efka/src/docker/docker_uds_commands.erl b/apps/efka/src/docker/docker_uds_commands.erl index 8db5b0b..c29d2cb 100644 --- a/apps/efka/src/docker/docker_uds_commands.erl +++ b/apps/efka/src/docker/docker_uds_commands.erl @@ -12,24 +12,16 @@ %% API -export([pull_image/2, check_image_exist/1]). -export([create_container/3, check_container_exist/1, is_container_running/1, start_container/1, stop_container/1]). +-export([test/0]). + +test() -> + Image = <<"docker.1ms.run/library/nginx:latest">>, + pull_image(Image, fun(Msg) -> lager:debug("msg is: ~p", [Msg]) end). -spec pull_image(Image :: binary(), Callback :: fun((Msg :: binary()) -> no_return())) -> ok | {error, ExitCode :: integer()}. pull_image(Image, Callback) when is_binary(Image), is_function(Callback, 1) -> - PortSettings = [stream, exit_status, use_stdio, stderr_to_stdout, binary], - ExecCmd = "docker pull " ++ binary_to_list(Image), - lager:debug("cmd : ~p", [ExecCmd]), - case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of - Port when is_port(Port) -> - case gather_pull_output(Port, Callback) of - 0 -> - ok; - ExitCode -> - {error, ExitCode} - end; - Error -> - lager:debug("error: ~p", [Error]), - {error, -1} - end. + Url = io_lib:format("/images/create?fromImage=~s", [binary_to_list(Image)]), + docker_http:stream_request(Callback, "POST", Url, undefined, []). -spec check_image_exist(Image :: binary()) -> boolean(). check_image_exist(Image) when is_binary(Image) ->