From 2e2865a62f10ff17393da3149308c56b3dd1092a Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 3 Jun 2025 13:05:53 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80=E6=A6=82=E5=BF=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/efka/src/efka_agent.erl | 14 +++++++------- apps/efka/src/efka_transport.erl | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index b96c621..9732b78 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -162,7 +162,7 @@ handle_event(cast, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelA efka_transport:send(TransportPid, ?METHOD_PING, Ping), {keep_state, State}; -handle_event(info, {timeout, _, create_transport}, ?STATE_ACTIVATED, State) -> +handle_event(info, {timeout, _, create_transport}, ?STATE_DENIED, State) -> {ok, Props} = application:get_env(efka, tls_server), Host = proplists:get_value(host, Props), Port = proplists:get_value(port, Props), @@ -223,7 +223,7 @@ handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pi %% 激活消息 %% 微服务部署 -handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> #deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy), %% 短暂的等待,efka_inetd收到消息后就立即返回了 @@ -238,7 +238,7 @@ handle_event(info, {server_push, PacketId, <>} {keep_state, State}; %% 启动微服务 -handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 Reply = case efka_inetd:start_service(ServiceId) of ok -> @@ -251,7 +251,7 @@ handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 Reply = case efka_inetd:stop_service(ServiceId) of ok -> @@ -264,7 +264,7 @@ handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) -> +handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) -> #push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config), case efka_service:get_pid(ServiceId) of @@ -283,7 +283,7 @@ handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) -> +handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) -> #invoke{service_id = ServiceId, payload = Payload, timeout = Timeout} = message_pb:decode_msg(InvokeBin, invoke), %% 消息发送到订阅系统 case efka_service:get_pid(ServiceId) of @@ -302,7 +302,7 @@ handle_event(info, {server_push, PacketId, <>} end; %% 处理task_log -handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> #fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log), lager:debug("[efka_agent] get task_log request: ~p", [TaskId]), {ok, Logs} = efka_inetd_task_log:get_logs(TaskId), diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 19b03cc..b650121 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -83,7 +83,6 @@ start_link(ParentPid, Host, Port) when is_pid(ParentPid), is_list(Host), is_inte {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([ParentPid, Host, Port]) -> - ping_ticker(), {ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = undefined}}. %% @private @@ -115,6 +114,7 @@ handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = Paren {ok, Socket} -> ok = ssl:controlling_process(Socket, self()), ParentPid ! {connect_reply, ok}, + ping_ticker(), {noreply, State#state{socket = Socket}}; {error, Reason} -> ParentPid ! {connect_reply, {error, Reason}}, @@ -169,7 +169,7 @@ handle_info({ssl, Socket, <>}, State = #state{socket {noreply, State}; handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - ParentPid ! {server_push, PacketId, AsyncCallBin}, + ParentPid ! {server_async_call, PacketId, AsyncCallBin}, {noreply, State}; %% efka:request <-> iot:response