This commit is contained in:
anlicheng 2025-09-17 10:57:18 +08:00
parent c9d9d6efaf
commit 71297abd93

View File

@ -46,13 +46,13 @@
%%%=================================================================== %%%===================================================================
%% %%
-spec metric_data(MetaTag :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). -spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(MetaTag, DeviceUUID, RouteKey, Metric) when is_binary(MetaTag), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
gen_statem:cast(?SERVER, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}). gen_statem:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}).
-spec event(MetaTag :: binary(), EventType :: integer(), Params :: binary()) -> no_return(). -spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(MetaTag, EventType, Params) when is_binary(MetaTag), is_integer(EventType), is_binary(Params) -> event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_statem:cast(?SERVER, {event, MetaTag, EventType, Params}). gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}).
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
@ -108,9 +108,9 @@ handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _
{keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]}; {keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]};
%% , mnesia %% , mnesia
handle_event(cast, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
Packet = message_pb:encode_msg(#data{ Packet = message_pb:encode_msg(#data{
meta_tag = MetaTag, service_id = ServiceId,
device_uuid = DeviceUUID, device_uuid = DeviceUUID,
route_key = RouteKey, route_key = RouteKey,
metric = Metric metric = Metric
@ -118,9 +118,9 @@ handle_event(cast, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}, ?STATE_
efka_transport:send(TransportPid, ?METHOD_DATA, Packet), efka_transport:send(TransportPid, ?METHOD_DATA, Packet),
{keep_state, State}; {keep_state, State};
handle_event(cast, {metric_data, MetaTag, DeviceUUID, LineProtocolData}, _, State) -> handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, State) ->
Packet = message_pb:encode_msg(#data{ Packet = message_pb:encode_msg(#data{
meta_tag = MetaTag, service_id = ServiceId,
device_uuid = DeviceUUID, device_uuid = DeviceUUID,
metric = LineProtocolData metric = LineProtocolData
}), }),
@ -241,7 +241,7 @@ handle_event(info, {server_async_call, PacketId, <<?PUSH_DEPLOY:8, DeployBin/bin
case catch jiffy:decode(Config0, [return_maps]) of case catch jiffy:decode(Config0, [return_maps]) of
Config when is_map(Config) -> Config when is_map(Config) ->
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:deploy(TaskId, Config) of Reply = case docker_manager:deploy(TaskId, Config) of
ok -> ok ->
#async_call_reply{code = 1, result = <<"ok">>}; #async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
@ -257,7 +257,7 @@ handle_event(info, {server_async_call, PacketId, <<?PUSH_DEPLOY:8, DeployBin/bin
%% %%
handle_event(info, {server_async_call, PacketId, <<?PUSH_START_CONTAINER:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_async_call, PacketId, <<?PUSH_START_CONTAINER:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:start_container(ServiceId) of Reply = case docker_manager:start_container(ServiceId) of
ok -> ok ->
#async_call_reply{code = 1, result = <<"ok">>}; #async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
@ -270,7 +270,7 @@ handle_event(info, {server_async_call, PacketId, <<?PUSH_START_CONTAINER:8, Serv
%% %%
handle_event(info, {server_async_call, PacketId, <<?PUSH_STOP_CONTAINER:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_async_call, PacketId, <<?PUSH_STOP_CONTAINER:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:stop_container(ServiceId) of Reply = case docker_manager:stop_container(ServiceId) of
ok -> ok ->
#async_call_reply{code = 1, result = <<"ok">>}; #async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
@ -282,7 +282,7 @@ handle_event(info, {server_async_call, PacketId, <<?PUSH_STOP_CONTAINER:8, Servi
%% config.json配置信息 %% config.json配置信息
handle_event(info, {server_async_call, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) -> handle_event(info, {server_async_call, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) ->
#push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config), #service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config),
case efka_service:get_pid(ServiceId) of case efka_service:get_pid(ServiceId) of
undefined -> undefined ->