diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index c93023d..f4875be 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -46,13 +46,13 @@ %%%=================================================================== %% 发送数据 --spec metric_data(MetaTag :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). -metric_data(MetaTag, DeviceUUID, RouteKey, Metric) when is_binary(MetaTag), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> - gen_statem:cast(?SERVER, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}). +-spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). +metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> + gen_statem:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}). --spec event(MetaTag :: binary(), EventType :: integer(), Params :: binary()) -> no_return(). -event(MetaTag, EventType, Params) when is_binary(MetaTag), is_integer(EventType), is_binary(Params) -> - gen_statem:cast(?SERVER, {event, MetaTag, EventType, Params}). +-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return(). +event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) -> + gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}). ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). @@ -108,9 +108,9 @@ handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _ {keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]}; %% 异步发送数据, 连接存在时候直接发送;否则缓存到mnesia -handle_event(cast, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> Packet = message_pb:encode_msg(#data{ - meta_tag = MetaTag, + service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric @@ -118,9 +118,9 @@ handle_event(cast, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}, ?STATE_ efka_transport:send(TransportPid, ?METHOD_DATA, Packet), {keep_state, State}; -handle_event(cast, {metric_data, MetaTag, DeviceUUID, LineProtocolData}, _, State) -> +handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, State) -> Packet = message_pb:encode_msg(#data{ - meta_tag = MetaTag, + service_id = ServiceId, device_uuid = DeviceUUID, metric = LineProtocolData }), @@ -241,7 +241,7 @@ handle_event(info, {server_async_call, PacketId, < %% 短暂的等待,efka_inetd收到消息后就立即返回了 - Reply = case efka_inetd:deploy(TaskId, Config) of + Reply = case docker_manager:deploy(TaskId, Config) of ok -> #async_call_reply{code = 1, result = <<"ok">>}; {error, Reason} when is_binary(Reason) -> @@ -257,7 +257,7 @@ handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 - Reply = case efka_inetd:start_container(ServiceId) of + Reply = case docker_manager:start_container(ServiceId) of ok -> #async_call_reply{code = 1, result = <<"ok">>}; {error, Reason} when is_binary(Reason) -> @@ -270,7 +270,7 @@ handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 - Reply = case efka_inetd:stop_container(ServiceId) of + Reply = case docker_manager:stop_container(ServiceId) of ok -> #async_call_reply{code = 1, result = <<"ok">>}; {error, Reason} when is_binary(Reason) -> @@ -282,7 +282,7 @@ handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) -> - #push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config), + #service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config), case efka_service:get_pid(ServiceId) of undefined ->