From 41c1a7cf78eb4c748512ea7f0a3cd6fb62122721 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 9 May 2025 16:02:59 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_agent.erl | 21 +++++++++++---------- apps/efka/src/efka_service.erl | 6 +++--- message_pb.proto | 1 + 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 74e5d93..4a9d752 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -49,6 +49,10 @@ metric_data(ServiceId, DeviceUUID, LineProtocolData) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(LineProtocolData) -> gen_server:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, LineProtocolData}). +-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return(). +event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) -> + gen_server:cast(?SERVER, {event, ServiceId, EventType, Params}). + ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). @@ -56,10 +60,6 @@ ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, Cp feedback_phase(TaskId, Timestamp, Phase) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase) -> gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase}). --spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return(). -event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) -> - gen_server:cast(?SERVER, {event, ServiceId, EventType, Params}). - %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -236,13 +236,14 @@ handle_info({auth_reply, Reply}, State = #state{status = ?STATE_AUTH, transport_ handle_info({server_push, PacketId, <>}, State = #state{transport_pid = TransportPid}) -> #deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy), + %% 短暂的等待,efka_inetd收到消息后就立即返回了 Reply = case efka_inetd:deploy(TaskId, ServiceId, TarUrl) of ok -> #push_reply{code = 1, message = <<"">>}; {error, Reason} when is_binary(Reason) -> - #push_reply{code = 1, message = Reason} + #push_reply{code = 0, message = Reason} end, - efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), + is_pid(TransportPid) andalso efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), {noreply, State}; @@ -252,13 +253,13 @@ handle_info({server_push, PacketId, <> case efka_service:get_pid(ServiceId) of undefined -> - Reply = #push_reply{code = 0, message = <<"service not run">>, result = <<>>}, - efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), + Reply = #push_reply{code = 0, message = <<"service not run">>}, + is_pid(TransportPid) andalso efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), {noreply, State}; ServicePid when is_pid(ServicePid) -> Ref = make_ref(), + %% 将配置文件推送到对应的微服务 efka_service:push_config(ServicePid, Ref, ConfigJson), - %% 处理超时逻辑 erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}), @@ -310,7 +311,7 @@ handle_info({server_pub, Topic, Content}, State = #state{status = ?STATE_ACTIVAT {noreply, State}; %% 收到来自efka_service的回复 -handle_info({ems_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) -> +handle_info({service_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) -> case maps:take(Ref, Inflight) of error -> {noreply, State}; diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index 5a12472..e69c7bb 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -227,7 +227,7 @@ handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{running_ efka_tcp_channel:push_config(ChannelPid, Ref, self(), ConfigJson), {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; false -> - ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}}, + ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, {reply, State} end; @@ -238,7 +238,7 @@ handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{running_status = efka_tcp_channel:invoke(ChannelPid, Ref, self(), Payload), {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; false -> - ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}}, + ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, {reply, State} end; @@ -275,7 +275,7 @@ handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight}) -> error -> {noreply, State}; {ReceiverPid, NInflight} -> - ReceiverPid ! {ems_reply, Ref, Reply}, + ReceiverPid ! {service_reply, Ref, Reply}, {noreply, State#state{inflight = NInflight}} end; diff --git a/message_pb.proto b/message_pb.proto index f51bc15..bc4e0bd 100644 --- a/message_pb.proto +++ b/message_pb.proto @@ -25,6 +25,7 @@ message Pub { ///// 服务器主动推送的消息 message PushReply { + // 0: 表示失败,1: 成功 uint32 code = 1; string result = 2; string message = 3;