ekfa/apps/efka/src/docker/docker_http.erl
2025-09-26 13:53:57 +08:00

87 lines
3.6 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%% docker_http.erl
-module(docker_http).
-export([request/4, stream_request/5]).
%% 通过 Unix Socket 调用 Docker API
-spec request(Method :: iolist(), Path :: string(), Body :: binary() | undefined, Headers :: list()) ->
{ok, StatusCode :: integer(), RespHeaders :: proplists:proplist(), RespBody :: binary()} | {error, any()}.
request(Method, Path, Body, Headers) when is_list(Method); is_binary(Method), is_list(Path), is_binary(Body), is_list(Headers) ->
SocketPath = "/var/run/docker.sock",
%% 使用 gun:open/2 + {local, Path} 方式
case gun:open_unix(SocketPath, #{}) of
{ok, ConnPid} ->
{ok, http} = gun:await_up(ConnPid),
%% 如果 Body 是 undefined就用 <<>> 代替
BodyBin = case Body of
undefined -> <<>>;
B when is_binary(B) -> B
end,
%% 发送 HTTP 请求
StreamRef = gun:request(ConnPid, Method, Path, Headers, BodyBin),
receive_response(ConnPid, StreamRef);
{error, Reason} ->
{error, Reason}
end.
receive_response(ConnPid, StreamRef) ->
receive
{gun_response, ConnPid, StreamRef, nofin, Status, Headers} ->
receive_body(ConnPid, StreamRef, Status, Headers, <<>>);
{gun_response, ConnPid, StreamRef, fin, Status, Headers} ->
{ok, Status, Headers, <<>>};
{gun_down, ConnPid, _, Reason, _} ->
{error, {http_closed, Reason}}
after 5000 ->
{error, timeout11}
end.
receive_body(ConnPid, StreamRef, Status, Headers, Acc) ->
receive
{gun_data, ConnPid, StreamRef, fin, Data} ->
{ok, Status, Headers, <<Acc/binary, Data/binary>>};
{gun_data, ConnPid, StreamRef, nofin, Data} ->
NewAcc = <<Acc/binary, Data/binary>>,
receive_body(ConnPid, StreamRef, Status, Headers, NewAcc)
after 10000 ->
{error, timeout22}
end.
%% 通过 Unix Socket 调用 Docker API
-spec stream_request(Callback :: any(), Method :: iolist(), Path :: string(), Body :: binary() | undefined, Headers :: list()) -> ok | {error, Reason :: any()}.
stream_request(Callback, Method, Path, Body, Headers) when is_list(Method); is_binary(Method), is_list(Path), is_binary(Body), is_list(Headers) ->
SocketPath = "/var/run/docker.sock",
case gun:open_unix(SocketPath, #{}) of
{ok, ConnPid} ->
{ok, http} = gun:await_up(ConnPid),
%% 如果 Body 是 undefined就用 <<>> 代替
BodyBin = case Body of
undefined -> <<>>;
B when is_binary(B) -> B
end,
%% 发送 HTTP 请求
StreamRef = gun:request(ConnPid, Method, Path, Headers, BodyBin),
receive_response(Callback, ConnPid, StreamRef);
{error, Reason} ->
Callback({error, Reason}),
{error, Reason}
end.
receive_response(Callback, ConnPid, StreamRef) ->
receive
{gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} ->
receive_body(Callback, ConnPid, StreamRef);
{gun_down, ConnPid, _, Reason, _} ->
Callback({error, Reason}),
{error, Reason}
after 5000 ->
Callback({error, <<"处理超时"/utf8>>}),
{error, timeout}
end.
receive_body(Callback, ConnPid, StreamRef) ->
receive
{gun_data, ConnPid, StreamRef, fin, Data} ->
Callback({message, Data}),
ok;
{gun_data, ConnPid, StreamRef, nofin, Data} ->
Callback({message, Data}),
receive_body(Callback, ConnPid, StreamRef)
end.