fix dialyzer

This commit is contained in:
anlicheng 2025-10-09 12:59:21 +08:00
parent 901d91e83e
commit 5ed669d568
5 changed files with 37 additions and 67 deletions

View File

@ -43,7 +43,7 @@ create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerNa
NewConfig = Config#{<<"volumes">> => Volumes}, NewConfig = Config#{<<"volumes">> => Volumes},
Options = build_options(ContainerName, NewConfig), Options = build_options(ContainerName, NewConfig),
lists:foreach(fun({K, V}) -> lager:debug("~p => ~p", [K, V]) end, maps:to_list(Options)), display_options(Options),
Body = iolist_to_binary(jiffy:encode(Options, [force_utf8])), Body = iolist_to_binary(jiffy:encode(Options, [force_utf8])),
Headers = [ Headers = [
@ -59,7 +59,9 @@ create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerNa
{error, Msg}; {error, Msg};
_ -> _ ->
{error, ErrorResp} {error, ErrorResp}
end end;
{error, Reason} ->
{error, Reason}
end. end.
-spec is_container_running(ContainerId :: binary()) -> boolean(). -spec is_container_running(ContainerId :: binary()) -> boolean().
@ -434,4 +436,8 @@ build_extra_hosts(Config) ->
#{}; #{};
_ -> _ ->
#{<<"ExtraHosts">> => Hosts} #{<<"ExtraHosts">> => Hosts}
end. end.
-spec display_options(Options :: map()) -> no_return().
display_options(Options) when is_map(Options) ->
lists:foreach(fun({K, V}) -> lager:debug("~p => ~p", [K, V]) end, maps:to_list(Options)).

View File

@ -41,52 +41,36 @@ start_monitor(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Con
deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) ->
%% %%
ContainerName = maps:get(<<"container_name">>, Config), ContainerName = maps:get(<<"container_name">>, Config),
log(TaskId, <<"info">>, <<"开始部署容器:"/utf8, ContainerName/binary>>),
Msg = <<"开始部署容器:"/utf8, ContainerName/binary>>, ContainerName = <<>>,
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg),
efka_logger:write(format_log(TaskId, <<"info">>, Msg)),
case docker_commands:check_container_exist(ContainerName) of case docker_commands:check_container_exist(ContainerName) of
true -> true ->
Msg1 = <<"本地容器已经存在:"/utf8, ContainerName/binary>>, log(TaskId, <<"info">>, <<"本地容器已经存在:"/utf8, ContainerName/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg1), efka_remote_agent:close_task_event_stream(TaskId);
efka_remote_agent:close_task_event_stream(TaskId),
efka_logger:write(format_log(TaskId, <<"info">>, Msg1));
false -> false ->
Image0 = maps:get(<<"image">>, Config), Image0 = maps:get(<<"image">>, Config),
Image = normalize_image(Image0), Image = normalize_image(Image0),
Msg2 = <<"使用镜像:"/utf8, Image/binary>>, log(TaskId, <<"info">>, <<"使用镜像:"/utf8, Image/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg2),
efka_logger:write(format_log(TaskId, <<"info">>, Msg2)),
PullResult = case docker_commands:check_image_exist(Image) of PullResult = case docker_commands:check_image_exist(Image) of
true -> true ->
Msg3 = <<"镜像本地已存在:"/utf8, Image/binary>>, log(TaskId, <<"info">>, <<"镜像本地已存在:"/utf8, Image/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg3),
efka_logger:write(format_log(TaskId, <<"info">>, Msg3)),
ok; ok;
false -> false ->
Msg4 = <<"开始拉取镜像:"/utf8, Image/binary>>, log(TaskId, <<"info">>, <<"开始拉取镜像:"/utf8, Image/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg4),
efka_logger:write(format_log(TaskId, <<"info">>, Msg4)),
CB = fun CB = fun
({message, M}) -> ({message, M}) ->
efka_remote_agent:task_event_stream(TaskId, <<"info">>, M), log(TaskId, <<"info">>, M);
efka_logger:write(format_log(TaskId, <<"info">>, M));
({error, Error}) -> ({error, Error}) ->
efka_remote_agent:task_event_stream(TaskId, <<"error">>, Error), log(TaskId, <<"error">>, Error)
efka_logger:write(format_log(TaskId, <<"error">>, Error))
end, end,
docker_commands:pull_image(Image, CB) docker_commands:pull_image(Image, CB)
end, end,
case PullResult of case PullResult of
ok -> ok ->
Msg5 = <<"开始创建容器: "/utf8, ContainerName/binary>>, log(TaskId, <<"info">>, <<"开始创建容器: "/utf8, ContainerName/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg5),
efka_logger:write(format_log(TaskId, <<"info">>, Msg5)),
case docker_commands:create_container(ContainerName, ContainerDir, Config) of case docker_commands:create_container(ContainerName, ContainerDir, Config) of
{ok, ContainerId} -> {ok, ContainerId} ->
%% %%
@ -97,37 +81,19 @@ deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerD
file:close(FD); file:close(FD);
{error, Reason} -> {error, Reason} ->
Reason1 = list_to_binary(io_lib:format("~p", [Reason])), Reason1 = list_to_binary(io_lib:format("~p", [Reason])),
Msg6 = <<"创建配置文件失败: "/utf8, Reason1/binary>>, log(TaskId, <<"notice">>, <<"创建配置文件失败: "/utf8, Reason1/binary>>)
efka_remote_agent:task_event_stream(TaskId, <<"notice">>, Msg6),
efka_logger:write(format_log(TaskId, <<"notice">>, Msg6))
end, end,
ShortContainerId = binary:part(ContainerId, 1, 12), ShortContainerId = binary:part(ContainerId, 1, 12),
log(TaskId, <<"info">>, <<"容器创建成功: "/utf8, ShortContainerId/binary>>),
Msg7 = <<"容器创建成功: "/utf8, ShortContainerId/binary>>, log(TaskId, <<"info">>, <<"任务完成"/utf8>>),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg7),
efka_logger:write(format_log(TaskId, <<"info">>, Msg7)),
Msg8 = <<"任务完成"/utf8>>,
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg8),
efka_logger:write(format_log(TaskId, <<"info">>, Msg8)),
efka_remote_agent:close_task_event_stream(TaskId); efka_remote_agent:close_task_event_stream(TaskId);
{error, Reason} -> {error, Reason} ->
Msg9 = <<"容器创建失败: "/utf8, Reason/binary>>, log(TaskId, <<"error">>, <<"容器创建失败: "/utf8, Reason/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg9), log(TaskId, <<"error">>, <<"任务失败"/utf8>>),
efka_logger:write(format_log(TaskId, <<"error">>, Msg9)),
Msg10 = <<"任务失败"/utf8>>,
efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg10),
efka_logger:write(format_log(TaskId, <<"error">>, Msg10)),
efka_remote_agent:close_task_event_stream(TaskId) efka_remote_agent:close_task_event_stream(TaskId)
end; end;
{error, Reason} -> {error, Reason} ->
Msg11 = <<"镜像拉取失败: "/utf8, Reason/binary>>, log(TaskId, <<"error">>, <<"镜像拉取失败: "/utf8, Reason/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg11),
efka_logger:write(format_log(TaskId, <<"error">>, Msg11)),
efka_remote_agent:close_task_event_stream(TaskId) efka_remote_agent:close_task_event_stream(TaskId)
end end
end. end.
@ -142,6 +108,8 @@ normalize_image(Image) when is_binary(Image) ->
end, end,
iolist_to_binary(lists:join(<<"/">>, PrefixParts ++ [NormalizedLast])). iolist_to_binary(lists:join(<<"/">>, PrefixParts ++ [NormalizedLast])).
-spec format_log(Label :: binary(), TaskId :: integer(), Msg :: binary()) -> binary(). -spec log(TaskId :: integer(), Level :: binary(), Msg :: binary()) -> no_return().
format_log(TaskId, Label, Msg) when is_binary(Label), is_binary(Msg) -> log(TaskId, Level, Msg) when is_integer(TaskId), is_binary(Level), is_binary(Msg) ->
iolist_to_binary([<<"task_id=">>, integer_to_binary(TaskId), <<" ">>, Label, <<" ">>, Msg]). efka_remote_agent:task_event_stream(TaskId, Level, Msg),
Info = iolist_to_binary([<<"task_id=">>, integer_to_binary(TaskId), <<" ">>, Level, <<" ">>, Msg]),
efka_logger:write(Info).

View File

@ -3,21 +3,16 @@
-export([request/4, stream_request/5]). -export([request/4, stream_request/5]).
%% Unix Socket Docker API %% Unix Socket Docker API
-spec request(Method :: iolist(), Path :: string(), Body :: binary() | undefined, Headers :: list()) -> -spec request(Method :: string(), Path :: string(), Body :: binary() | undefined, Headers :: list()) ->
{ok, StatusCode :: integer(), RespHeaders :: proplists:proplist(), RespBody :: binary()} | {error, any()}. {ok, StatusCode :: integer(), RespHeaders :: proplists:proplist(), RespBody :: binary()} | {error, any()}.
request(Method, Path, Body, Headers) when is_list(Method); is_binary(Method), is_list(Path), is_binary(Body), is_list(Headers) -> request(Method, Path, Body, Headers) when is_list(Method), is_list(Path), is_binary(Body), is_list(Headers) ->
SocketPath = "/var/run/docker.sock", SocketPath = "/var/run/docker.sock",
%% 使 gun:open/2 + {local, Path} %% 使 gun:open/2 + {local, Path}
case gun:open_unix(SocketPath, #{}) of case gun:open_unix(SocketPath, #{}) of
{ok, ConnPid} -> {ok, ConnPid} ->
{ok, http} = gun:await_up(ConnPid), {ok, http} = gun:await_up(ConnPid),
%% Body undefined <<>>
BodyBin = case Body of
undefined -> <<>>;
B when is_binary(B) -> B
end,
%% HTTP %% HTTP
StreamRef = gun:request(ConnPid, Method, Path, Headers, BodyBin), StreamRef = gun:request(ConnPid, Method, Path, Headers, Body),
receive_response(ConnPid, StreamRef); receive_response(ConnPid, StreamRef);
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
@ -32,7 +27,7 @@ receive_response(ConnPid, StreamRef) ->
{gun_down, ConnPid, _, Reason, _} -> {gun_down, ConnPid, _, Reason, _} ->
{error, {http_closed, Reason}} {error, {http_closed, Reason}}
after 5000 -> after 5000 ->
{error, timeout11} {error, timeout}
end. end.
receive_body(ConnPid, StreamRef, Status, Headers, Acc) -> receive_body(ConnPid, StreamRef, Status, Headers, Acc) ->
receive receive

View File

@ -29,7 +29,8 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
write(Data) -> -spec write(Data :: binary()) -> no_return().
write(Data) when is_binary(Data) ->
gen_server:cast(?SERVER, {write, Data}). gen_server:cast(?SERVER, {write, Data}).
write_lines(Lines) when is_list(Lines) -> write_lines(Lines) when is_list(Lines) ->

View File

@ -18,7 +18,7 @@
-define(PENDING_TIMEOUT, 10 * 1000). -define(PENDING_TIMEOUT, 10 * 1000).
-record(state, { -record(state, {
service_id :: binary(), service_id :: undefined | binary(),
service_pid :: undefined | pid(), service_pid :: undefined | pid(),
is_registered = false :: boolean() is_registered = false :: boolean()
}). }).