fix
This commit is contained in:
parent
7c1c1ed9af
commit
b8addc039a
@ -87,5 +87,6 @@
|
|||||||
|
|
||||||
-record(task_event_stream, {
|
-record(task_event_stream, {
|
||||||
task_id :: integer(),
|
task_id :: integer(),
|
||||||
|
type :: binary(),
|
||||||
stream :: binary()
|
stream :: binary()
|
||||||
}).
|
}).
|
||||||
@ -9,6 +9,7 @@
|
|||||||
-module(docker_deployer).
|
-module(docker_deployer).
|
||||||
-author("anlicheng").
|
-author("anlicheng").
|
||||||
-include("efka_tables.hrl").
|
-include("efka_tables.hrl").
|
||||||
|
-include("message.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
@ -23,6 +24,11 @@
|
|||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
-define(STREAM_TYPE_INFO, <<"info">>).
|
||||||
|
-define(STREAM_TYPE_ERROR, <<"error">>).
|
||||||
|
-define(STREAM_TYPE_WARNING, <<"warning">>).
|
||||||
|
-define(STREAM_TYPE_NOTICE, <<"notice">>).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
root_dir :: string(),
|
root_dir :: string(),
|
||||||
task_id :: integer(),
|
task_id :: integer(),
|
||||||
@ -168,34 +174,34 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) ->
|
do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) ->
|
||||||
%% 尝试拉取镜像
|
%% 尝试拉取镜像
|
||||||
ContainerName = maps:get(<<"container_name">>, Config),
|
ContainerName = maps:get(<<"container_name">>, Config),
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"开始部署容器:"/utf8, ContainerName/binary>>),
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"开始部署容器:"/utf8, ContainerName/binary>>),
|
||||||
|
|
||||||
case docker_commands:check_container_exist(ContainerName) of
|
case docker_commands:check_container_exist(ContainerName) of
|
||||||
true ->
|
true ->
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"本地容器已经存在:"/utf8, ContainerName/binary>>);
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"本地容器已经存在:"/utf8, ContainerName/binary>>);
|
||||||
false ->
|
false ->
|
||||||
Image0 = maps:get(<<"image">>, Config),
|
Image0 = maps:get(<<"image">>, Config),
|
||||||
Image = normalize_image(Image0),
|
Image = normalize_image(Image0),
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"使用镜像:"/utf8, Image/binary>>),
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"使用镜像:"/utf8, Image/binary>>),
|
||||||
|
|
||||||
PullResult = case docker_commands:check_image_exist(Image) of
|
PullResult = case docker_commands:check_image_exist(Image) of
|
||||||
true ->
|
true ->
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"镜像本地已存在:"/utf8, Image/binary>>),
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"镜像本地已存在:"/utf8, Image/binary>>),
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"开始拉取镜像:"/utf8, Image/binary>>),
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"开始拉取镜像:"/utf8, Image/binary>>),
|
||||||
CB = fun
|
CB = fun
|
||||||
({message, Msg}) ->
|
({message, Msg}) ->
|
||||||
efka_remote_agent:task_event_stream(TaskId, Msg);
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, Msg);
|
||||||
({error, Error}) ->
|
({error, Error}) ->
|
||||||
efka_remote_agent:task_event_stream(TaskId, Error)
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, Error)
|
||||||
end,
|
end,
|
||||||
docker_commands:pull_image(Image, CB)
|
docker_commands:pull_image(Image, CB)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
case PullResult of
|
case PullResult of
|
||||||
ok ->
|
ok ->
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"开始创建容器: "/utf8, ContainerName/binary>>),
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"开始创建容器: "/utf8, ContainerName/binary>>),
|
||||||
case docker_commands:create_container(ContainerName, ContainerDir, Config) of
|
case docker_commands:create_container(ContainerName, ContainerDir, Config) of
|
||||||
{ok, ContainerId} ->
|
{ok, ContainerId} ->
|
||||||
%% 创建容器对应的配置文件
|
%% 创建容器对应的配置文件
|
||||||
@ -206,17 +212,17 @@ do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Contain
|
|||||||
file:close(FD);
|
file:close(FD);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
Reason1 = list_to_binary(io_lib:format("~p", [Reason])),
|
Reason1 = list_to_binary(io_lib:format("~p", [Reason])),
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"创建配置文件失败: "/utf8, Reason1/binary>>)
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_NOTICE, <<"创建配置文件失败: "/utf8, Reason1/binary>>)
|
||||||
end,
|
end,
|
||||||
ShortContainerId = binary:part(ContainerId, 1, 12),
|
ShortContainerId = binary:part(ContainerId, 1, 12),
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"容器创建成功: "/utf8, ShortContainerId/binary>>),
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"容器创建成功: "/utf8, ShortContainerId/binary>>),
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"任务完成"/utf8>>);
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_INFO, <<"任务完成"/utf8>>);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"容器创建失败: "/utf8, Reason/binary>>),
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, <<"容器创建失败: "/utf8, Reason/binary>>),
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"任务失败"/utf8>>)
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, <<"任务失败"/utf8>>)
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
efka_remote_agent:task_event_stream(TaskId, <<"镜像拉取失败: "/utf8, Reason/binary>>)
|
efka_remote_agent:task_event_stream(TaskId, ?STREAM_TYPE_ERROR, <<"镜像拉取失败: "/utf8, Reason/binary>>)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|||||||
@ -15,7 +15,7 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
-export([metric_data/4, event/3, ping/13, task_event_stream/2]).
|
-export([metric_data/4, event/3, ping/13, task_event_stream/3]).
|
||||||
|
|
||||||
%% gen_statem callbacks
|
%% gen_statem callbacks
|
||||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||||
@ -49,9 +49,9 @@ metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId),
|
|||||||
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
|
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
|
||||||
gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}).
|
gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}).
|
||||||
|
|
||||||
-spec task_event_stream(TaskId :: integer(), Stream :: binary()) -> no_return().
|
-spec task_event_stream(TaskId :: integer(), Type :: binary(), Stream :: binary()) -> no_return().
|
||||||
task_event_stream(TaskId, Stream) when is_integer(TaskId), is_binary(Stream) ->
|
task_event_stream(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) ->
|
||||||
gen_statem:cast(?SERVER, {task_event_stream, TaskId, Stream}).
|
gen_statem:cast(?SERVER, {task_event_stream, TaskId, Type, Stream}).
|
||||||
|
|
||||||
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
|
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
|
||||||
gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
|
gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
|
||||||
@ -124,10 +124,11 @@ handle_event(cast, {event, ServiceId, EventType, Params}, _, State) ->
|
|||||||
ok = cache_model:insert(EventPacket),
|
ok = cache_model:insert(EventPacket),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
handle_event(cast, {task_event_stream, TaskId, Stream}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
handle_event(cast, {task_event_stream, TaskId, Type, Stream}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
lager:debug("[efka_remote_agent] event_stream task_id: ~p, stream: ~ts", [TaskId, Stream]),
|
lager:debug("[efka_remote_agent] event_stream task_id: ~p, stream: ~ts", [TaskId, Stream]),
|
||||||
EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{
|
EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{
|
||||||
task_id = TaskId,
|
task_id = TaskId,
|
||||||
|
type = Type,
|
||||||
stream = Stream
|
stream = Stream
|
||||||
}),
|
}),
|
||||||
efka_transport:send(TransportPid, EventPacket),
|
efka_transport:send(TransportPid, EventPacket),
|
||||||
|
|||||||
@ -67,9 +67,10 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params})
|
|||||||
marshal(?I32, EventType),
|
marshal(?I32, EventType),
|
||||||
marshal(?Bytes, Params)
|
marshal(?Bytes, Params)
|
||||||
]);
|
]);
|
||||||
encode0(#task_event_stream{task_id = TaskId, stream = Stream}) ->
|
encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) ->
|
||||||
iolist_to_binary([
|
iolist_to_binary([
|
||||||
marshal(?I32, TaskId),
|
marshal(?I32, TaskId),
|
||||||
|
marshal(?Bytes, Type),
|
||||||
marshal(?Bytes, Stream)
|
marshal(?Bytes, Stream)
|
||||||
]).
|
]).
|
||||||
|
|
||||||
@ -97,8 +98,8 @@ decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
|
|||||||
{ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}};
|
{ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}};
|
||||||
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
|
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
|
||||||
{ok, #event{service_id = ServiceId, event_type = EventType, params = Params}};
|
{ok, #event{service_id = ServiceId, event_type = EventType, params = Params}};
|
||||||
decode0(?MESSAGE_EVENT_STREAM, [TaskId, Stream]) ->
|
decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) ->
|
||||||
{ok, #task_event_stream{task_id = TaskId, stream = Stream}};
|
{ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}};
|
||||||
decode0(_, _) ->
|
decode0(_, _) ->
|
||||||
error.
|
error.
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user