This commit is contained in:
anlicheng 2025-09-16 16:40:11 +08:00
parent ed90316457
commit 6b16612214
6 changed files with 76 additions and 89 deletions

View File

@ -81,7 +81,7 @@
-ifndef('DATA_PB_H').
-define('DATA_PB_H', true).
-record(data,
{service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
{meta_tag = <<>> :: unicode:chardata() | undefined, % = 1, optional
device_uuid = <<>> :: unicode:chardata() | undefined, % = 2, optional
route_key = <<>> :: unicode:chardata() | undefined, % = 3, optional
metric = <<>> :: iodata() | undefined % = 4, optional

View File

@ -14,8 +14,11 @@
-behaviour(gen_server).
-define(STATUS_RUNNING, running).
-define(STATUS_STOPPED, stopped).
%% API
-export([start_link/3]).
-export([start_link/2]).
-export([get_name/1, get_pid/1, attach_channel/3]).
-export([push_envs/3, invoke/3]).
-export([metric_data/4, send_event/3]).
@ -24,7 +27,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
%service_id :: binary(),
container_id :: binary(),
%% id信息
channel_pid :: pid() | undefined,
@ -32,11 +34,10 @@
meta :: binary(),
%% port信息, OSPid = erlang:port_info(Port, os_pid)
port :: undefined | port(),
%% pid
os_pid :: undefined | integer(),
%%
manifest :: undefined | efka_manifest:manifest(),
inflight = #{},
%%
status = ?STATUS_STOPPED,
%% : #{Ref => Fun}
callbacks = #{}
@ -75,10 +76,10 @@ attach_channel(Pid, ChannelPid, Meta) when is_pid(Pid), is_pid(ChannelPid), is_b
gen_server:call(Pid, {attach_channel, ChannelPid, Meta}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), ContainerId :: binary(), Args :: [binary()]) ->
-spec(start_link(Name :: atom(), ContainerId :: binary()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, ContainerId, Args) when is_atom(Name), is_binary(ContainerId), is_list(Args) ->
gen_server:start_link({local, Name}, ?MODULE, [ContainerId, Args], []).
start_link(Name, ContainerId) when is_atom(Name), is_binary(ContainerId) ->
gen_server:start_link({local, Name}, ?MODULE, [ContainerId], []).
%%%===================================================================
%%% gen_server callbacks
@ -89,18 +90,16 @@ start_link(Name, ContainerId, Args) when is_atom(Name), is_binary(ContainerId),
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ContainerId, Args]) ->
init([ContainerId]) ->
%% supervisor进程通过exit(ChildPid, shutdown)terminate函数被调用
erlang:process_flag(trap_exit, true),
case startup(<<>>, Args) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ContainerId, Port, OSPid]),
{ok, #state{container_id = ContainerId, port = Port, os_pid = OSPid}};
{error, Reason} ->
lager:debug("[efka_service] service: ~p", [ContainerId]),
{stop, Reason}
case efka_docker_command:is_container_running(ContainerId) of
true ->
{ok, Port} = wait_container(ContainerId),
{ok, #state{container_id = ContainerId, status = ?STATUS_RUNNING, port = Port}};
false ->
lager:debug("[efka_service] container_id: ~p", [ContainerId]),
{ok, #state{container_id = ContainerId, status = ?STATUS_STOPPED}}
end.
%% @private
@ -136,12 +135,12 @@ handle_call(_Request, _From, State = #state{}) ->
handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{container_id = ContainerId, meta = Meta}) ->
lager:debug("[efka_service] container_id: ~p, meta: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p",
[ContainerId, Meta, DeviceUUID, RouteKey, Metric]),
%% meta相关的数据, todo container_id的数据是否需要加上
efka_remote_agent:metric_data(ContainerId, Meta, DeviceUUID, RouteKey, Metric),
%% meta相关的数据
efka_remote_agent:metric_data(Meta, DeviceUUID, RouteKey, Metric),
{noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{container_id = ContainerId, meta = Meta}) ->
efka_remote_agent:event(ContainerId, Meta, EventType, Params),
efka_remote_agent:event(Meta, EventType, Params),
lager:debug("[efka_service] send_event, container_id: ~p, meta: ~p, event_type: ~p, params: ~p", [ContainerId, Meta, EventType, Params]),
{noreply, State};
@ -178,19 +177,6 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%%
handle_info({timeout, _, reboot_service}, State = #state{container_id = ContainerId, manifest = Manifest}) ->
case startup(Manifest, []) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_service] service_id: ~p, reboot success, port: ~p, os_pid: ~p", [ContainerId, Port, OSPid]),
{noreply, State#state{port = Port, os_pid = OSPid}};
{error, Reason} ->
lager:debug("[efka_service] service_id: ~p, boot_service get error: ~p", [ContainerId, Reason]),
try_reboot(),
{noreply, State}
end;
%% channel的回复
handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) ->
case maps:take(Ref, Inflight) of
@ -209,13 +195,12 @@ handle_info({Port, {data, Data}}, State = #state{container_id = ContainerId}) wh
%% port的消息, Port的被动关闭会触发Port和State.port的值是相等的
handle_info({Port, {exit_status, Code}}, State = #state{container_id = ContainerId}) when is_port(Port) ->
lager:debug("[efka_service] service_id: ~p, port: ~p, exit with code: ~p", [ContainerId, Port, Code]),
{noreply, State#state{port = undefined, os_pid = undefined}};
{noreply, State#state{port = undefined}};
%% port的退出消息
handle_info({'EXIT', Port, Reason}, State = #state{container_id = ContainerId}) when is_port(Port) ->
lager:debug("[efka_service] service_id: ~p, port: ~p, exit with reason: ~p", [ContainerId, Port, Reason]),
try_reboot(),
{noreply, State#state{port = undefined, os_pid = undefined}};
{noreply, State#state{port = undefined}};
%% channel进程的退出
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, container_id = ContainerId}) ->
@ -229,9 +214,8 @@ handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{container_id = ContainerId, port = Port, os_pid = OSPid}) ->
terminate(Reason, _State = #state{container_id = ContainerId, port = Port}) ->
erlang:is_port(Port) andalso erlang:port_close(Port),
catch kill_os_pid(OSPid),
lager:debug("[efka_service] service_id: ~p, terminate with reason: ~p", [ContainerId, Reason]),
ok.
@ -247,32 +231,17 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
startup(ExecCmd0, Args0) ->
PortSettings = [
{args, [binary_to_list(A) || A <- Args0]},
exit_status
],
ExecCmd = binary_to_list(ExecCmd0),
case catch erlang:open_port({spawn_executable, ExecCmd}, PortSettings) of
-spec wait_container(ContainerId :: binary()) -> {ok, Port :: erlang:port()} | error.
wait_container(ContainerId) when is_binary(ContainerId) ->
PortSettings = [stream, exit_status, use_stdio, binary],
ExecCmd = "docker wait " ++ binary_to_list(ContainerId),
case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of
Port when is_port(Port) ->
{ok, Port};
_Other ->
{error, <<"exec command startup failed">>}
error
end.
%%
-spec kill_os_pid(port() | undefined) -> no_return().
kill_os_pid(undefined) ->
ok;
kill_os_pid(OSPid) when is_integer(OSPid) ->
Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OSPid])),
lager:debug("kill cmd is: ~p", [Cmd]),
os:cmd(Cmd).
-spec try_reboot() -> no_return().
try_reboot() ->
erlang:start_timer(5000, self(), reboot_service).
-spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map().
trigger_callback(Ref, Callbacks) ->
case maps:take(Ref, Callbacks) of

View File

@ -11,7 +11,7 @@
%% API
-export([pull_image/1, check_image_exist/1]).
-export([create_container/3, check_container_exist/1]).
-export([create_container/3, check_container_exist/1, is_container_running/1]).
-spec pull_image(Image :: binary()) -> ok | {error, Reason :: any()}.
pull_image(Image) when is_binary(Image) ->
@ -65,6 +65,24 @@ create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerNa
{error, <<"exec command startup failed">>}
end.
-spec is_container_running(ContainerId :: binary()) -> boolean().
is_container_running(ContainerId) when is_binary(ContainerId) ->
%% todo {stderr_to_stdout, true}
PortSettings = [stream, exit_status, use_stdio, binary],
ExecCmd = "docker inspect -f '{{.State.Running}}' " ++ binary_to_list(ContainerId),
case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of
Port when is_port(Port) ->
case gather_output(Port) of
{0, Val0} ->
Val = string:trim(Val0),
Val =:= <<"true">>;
_ ->
false
end;
_Error ->
false
end.
-spec check_image_exist(Image :: binary()) -> boolean().
check_image_exist(Image) when is_binary(Image) ->
PortSettings = [stream, exit_status, use_stdio, binary],

View File

@ -46,13 +46,13 @@
%%%===================================================================
%%
-spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
gen_statem:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}).
-spec metric_data(MetaTag :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(MetaTag, DeviceUUID, RouteKey, Metric) when is_binary(MetaTag), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
gen_statem:cast(?SERVER, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}).
-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}).
-spec event(MetaTag :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(MetaTag, EventType, Params) when is_binary(MetaTag), is_integer(EventType), is_binary(Params) ->
gen_statem:cast(?SERVER, {event, MetaTag, EventType, Params}).
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
@ -108,9 +108,9 @@ handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _
{keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]};
%% , mnesia
handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(cast, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
Packet = message_pb:encode_msg(#data{
service_id = ServiceId,
meta_tag = MetaTag,
device_uuid = DeviceUUID,
route_key = RouteKey,
metric = Metric
@ -118,9 +118,9 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STAT
efka_transport:send(TransportPid, ?METHOD_DATA, Packet),
{keep_state, State};
handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, State) ->
handle_event(cast, {metric_data, MetaTag, DeviceUUID, LineProtocolData}, _, State) ->
Packet = message_pb:encode_msg(#data{
service_id = ServiceId,
meta_tag = MetaTag,
device_uuid = DeviceUUID,
metric = LineProtocolData
}),

View File

@ -382,7 +382,7 @@ encode_msg_push_service_config(#push_service_config{service_id = F1, config_json
encode_msg_data(Msg, TrUserData) -> encode_msg_data(Msg, <<>>, TrUserData).
encode_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, metric = F4}, Bin, TrUserData) ->
encode_msg_data(#data{meta_tag = F1, device_uuid = F2, route_key = F3, metric = F4}, Bin, TrUserData) ->
B1 = if F1 == undefined -> Bin;
true ->
begin
@ -1281,18 +1281,18 @@ skip_64_push_service_config(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3,
decode_msg_data(Bin, TrUserData) -> dfp_read_field_def_data(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData).
dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData);
dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_meta_tag(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData);
dfp_read_field_def_data(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_device_uuid(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData);
dfp_read_field_def_data(<<26, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_route_key(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData);
dfp_read_field_def_data(<<34, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_metric(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData);
dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{service_id = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4};
dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{meta_tag = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4};
dfp_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dg_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData).
dg_read_field_def_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 32 - 7 -> dg_read_field_def_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData);
dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, F@_4, TrUserData) ->
Key = X bsl N + Acc,
case Key of
10 -> d_field_data_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData);
10 -> d_field_data_meta_tag(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData);
18 -> d_field_data_device_uuid(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData);
26 -> d_field_data_route_key(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData);
34 -> d_field_data_metric(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData);
@ -1305,10 +1305,10 @@ dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, F
5 -> skip_32_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData)
end
end;
dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{service_id = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4}.
dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{meta_tag = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4}.
d_field_data_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_data_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData);
d_field_data_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, F@_4, TrUserData) ->
d_field_data_meta_tag(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_data_meta_tag(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData);
d_field_data_meta_tag(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, F@_4, TrUserData) ->
{NewFValue, RestF} = begin Len = X bsl N + Acc, <<Bytes:Len/binary, Rest2/binary>> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end,
dfp_read_field_def_data(RestF, 0, 0, F, NewFValue, F@_2, F@_3, F@_4, TrUserData).
@ -1932,10 +1932,10 @@ merge_msg_push_service_config(#push_service_config{service_id = PFservice_id, co
end}.
-compile({nowarn_unused_function,merge_msg_data/3}).
merge_msg_data(#data{service_id = PFservice_id, device_uuid = PFdevice_uuid, route_key = PFroute_key, metric = PFmetric}, #data{service_id = NFservice_id, device_uuid = NFdevice_uuid, route_key = NFroute_key, metric = NFmetric}, _) ->
#data{service_id =
if NFservice_id =:= undefined -> PFservice_id;
true -> NFservice_id
merge_msg_data(#data{meta_tag = PFmeta_tag, device_uuid = PFdevice_uuid, route_key = PFroute_key, metric = PFmetric}, #data{meta_tag = NFmeta_tag, device_uuid = NFdevice_uuid, route_key = NFroute_key, metric = NFmetric}, _) ->
#data{meta_tag =
if NFmeta_tag =:= undefined -> PFmeta_tag;
true -> NFmeta_tag
end,
device_uuid =
if NFdevice_uuid =:= undefined -> PFdevice_uuid;
@ -2201,9 +2201,9 @@ v_msg_push_service_config(X, Path, _TrUserData) -> mk_type_error({expected_msg,
-compile({nowarn_unused_function,v_msg_data/3}).
-dialyzer({nowarn_function,v_msg_data/3}).
v_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, metric = F4}, Path, TrUserData) ->
v_msg_data(#data{meta_tag = F1, device_uuid = F2, route_key = F3, metric = F4}, Path, TrUserData) ->
if F1 == undefined -> ok;
true -> v_type_string(F1, [service_id | Path], TrUserData)
true -> v_type_string(F1, [meta_tag | Path], TrUserData)
end,
if F2 == undefined -> ok;
true -> v_type_string(F2, [device_uuid | Path], TrUserData)
@ -2415,7 +2415,7 @@ get_msg_defs() ->
#field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
#field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]},
{{msg, data},
[#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
[#field{name = meta_tag, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
#field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
#field{name = route_key, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []},
#field{name = metric, fnum = 4, rnum = 5, type = bytes, occurrence = optional, opts = []}]},
@ -2494,7 +2494,7 @@ find_msg_def(push_service_config) ->
#field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
#field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}];
find_msg_def(data) ->
[#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
[#field{name = meta_tag, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
#field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
#field{name = route_key, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []},
#field{name = metric, fnum = 4, rnum = 5, type = bytes, occurrence = optional, opts = []}];

View File

@ -61,7 +61,7 @@ message PushServiceConfig {
//
message Data {
string service_id = 1;
string meta_tag = 1;
string device_uuid = 2;
string route_key = 3;
// measurement[,tag_key=tag_value...] field_key=field_value[,field_key2=field_value2...] [timestamp]