处理数据流

This commit is contained in:
anlicheng 2025-09-23 15:05:40 +08:00
parent 637cdea796
commit f00f654e7a
2 changed files with 50 additions and 16 deletions

View File

@ -1,6 +1,6 @@
%%% docker_http.erl
-module(docker_http).
-export([request/4]).
-export([request/4, stream_request/5]).
%% Unix Socket Docker API
-spec request(Method :: iolist(), Path :: string(), Body :: binary() | undefined, Headers :: list()) ->
@ -42,3 +42,45 @@ receive_body(ConnPid, StreamRef, Status, Headers, Acc) ->
after 10000 ->
{error, timeout22}
end.
%% Unix Socket Docker API
-spec stream_request(Callback :: any(), Method :: iolist(), Path :: string(), Body :: binary() | undefined, Headers :: list()) -> no_return().
stream_request(Callback, Method, Path, Body, Headers) when is_list(Method); is_binary(Method), is_list(Path), is_binary(Body), is_list(Headers) ->
SocketPath = "/var/run/docker.sock",
case gun:open_unix(SocketPath, #{}) of
{ok, ConnPid} ->
{ok, http} = gun:await_up(ConnPid),
%% Body undefined <<>>
BodyBin = case Body of
undefined -> <<>>;
B when is_binary(B) -> B
end,
%% HTTP
StreamRef = gun:request(ConnPid, Method, Path, Headers, BodyBin),
receive_response(Callback, ConnPid, StreamRef);
{error, Reason} ->
Callback({error, Reason}),
{error, Reason}
end.
receive_response(Callback, ConnPid, StreamRef) ->
receive
{gun_response, ConnPid, StreamRef, nofin, Status, Headers} ->
Callback({head, Status, Headers}),
receive_body(Callback, ConnPid, StreamRef);
{gun_down, ConnPid, _, Reason, _} ->
Callback({error, Reason}),
{error, Reason}
after 5000 ->
Callback({error, <<"处理超时"/utf8>>}),
{error, timeout}
end.
receive_body(Callback, ConnPid, StreamRef) ->
receive
{gun_data, ConnPid, StreamRef, fin, Data} ->
Callback({message, Data}),
ok;
{gun_data, ConnPid, StreamRef, nofin, Data} ->
Callback({message, Data}),
receive_body(Callback, ConnPid, StreamRef)
end.

View File

@ -12,24 +12,16 @@
%% API
-export([pull_image/2, check_image_exist/1]).
-export([create_container/3, check_container_exist/1, is_container_running/1, start_container/1, stop_container/1]).
-export([test/0]).
test() ->
Image = <<"docker.1ms.run/library/nginx:latest">>,
pull_image(Image, fun(Msg) -> lager:debug("msg is: ~p", [Msg]) end).
-spec pull_image(Image :: binary(), Callback :: fun((Msg :: binary()) -> no_return())) -> ok | {error, ExitCode :: integer()}.
pull_image(Image, Callback) when is_binary(Image), is_function(Callback, 1) ->
PortSettings = [stream, exit_status, use_stdio, stderr_to_stdout, binary],
ExecCmd = "docker pull " ++ binary_to_list(Image),
lager:debug("cmd : ~p", [ExecCmd]),
case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of
Port when is_port(Port) ->
case gather_pull_output(Port, Callback) of
0 ->
ok;
ExitCode ->
{error, ExitCode}
end;
Error ->
lager:debug("error: ~p", [Error]),
{error, -1}
end.
Url = io_lib:format("/images/create?fromImage=~s", [binary_to_list(Image)]),
docker_http:stream_request(Callback, "POST", Url, undefined, []).
-spec check_image_exist(Image :: binary()) -> boolean().
check_image_exist(Image) when is_binary(Image) ->