From 5787f797c3c04aef037eb1df43a74c425c7891ff Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 20 May 2025 16:09:00 +0800 Subject: [PATCH] fix client --- apps/efka/include/efka.hrl | 2 +- apps/efka/src/client/efka_client.erl | 2 ++ apps/efka/src/client/efka_client_broker.erl | 2 +- apps/efka/src/efka_agent.erl | 5 +++-- apps/efka/src/efka_inetd_task.erl | 2 +- apps/efka/src/efka_inetd_task_log.erl | 2 +- 6 files changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/efka/include/efka.hrl b/apps/efka/include/efka.hrl index 192544c..348d40d 100644 --- a/apps/efka/include/efka.hrl +++ b/apps/efka/include/efka.hrl @@ -50,4 +50,4 @@ -define(PUSH_SERVICE_CONFIG, 16#04). -define(PUSH_INVOKE, 16#05). --define(PUSH_TASK_LOG, 16#05). +-define(PUSH_TASK_LOG, 16#06). diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index 3a0ff74..aefa022 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -263,6 +263,8 @@ handle_info({tcp, Socket, <>}, State = #state{socke ControllerPid ! {invoke, self(), Ref, Payload}, Reply = receive + {invoke_reply, Ref, ok} -> + #{<<"id">> => Id, <<"result">> => <<"ok">>}; {invoke_reply, Ref, {ok, Result}} -> #{<<"id">> => Id, <<"result">> => Result}; {invoke_reply, Ref, {error, Reason}} when is_binary(Reason) -> diff --git a/apps/efka/src/client/efka_client_broker.erl b/apps/efka/src/client/efka_client_broker.erl index 5a36762..2c65e42 100644 --- a/apps/efka/src/client/efka_client_broker.erl +++ b/apps/efka/src/client/efka_client_broker.erl @@ -82,7 +82,7 @@ handle_info({push_config, ReceiverPid, Ref, Config}, State = #state{}) -> {noreply, State}; handle_info({invoke, ReceiverPid, Ref, Payload}, State = #state{}) -> lager:debug("[efka_client_broker] get invoke: ~p", [Payload]), - ReceiverPid ! {invoke_reply, Ref, ok}, + ReceiverPid ! {invoke_reply, Ref, {ok, <<"yes invoke me">>}}, {noreply, State}. %% @private diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 720c168..a911efa 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -299,6 +299,7 @@ handle_info({server_push, PacketId, <>}, State %% 处理task_log handle_info({server_push, PacketId, <>}, State = #state{status = ?STATE_ACTIVATED}) -> #fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log), + lager:debug("[efka_agent] get task_log request: ~p", [TaskId]), {ok, Logs} = efka_inetd_task_log:get_logs(TaskId), Reply = case length(Logs) > 0 of true -> @@ -340,8 +341,8 @@ handle_info({service_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) {noreply, State}; {PacketId, NInflight} -> Reply = case EmsReply of - ok -> - #async_call_reply{code = 1, message = <<"">>, result = <<>>}; + {ok, Result} -> + #async_call_reply{code = 1, result = Result}; {error, Reason} -> #async_call_reply{code = 0, message = Reason} end, diff --git a/apps/efka/src/efka_inetd_task.erl b/apps/efka/src/efka_inetd_task.erl index cc4176a..24e2276 100644 --- a/apps/efka/src/efka_inetd_task.erl +++ b/apps/efka/src/efka_inetd_task.erl @@ -114,7 +114,7 @@ code_change(_OldVsn, State = #state{}, _Extra) -> do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(ServiceRootDir), is_binary(ServiceId), is_binary(TarUrl) -> case download(binary_to_list(TarUrl), ServiceRootDir) of {ok, TarFile, CostTs} -> - Log = io_lib:format("[efka_inetd_task] download: ~p completed, cost time: ~p(ms)", [binary_to_list(TarUrl), CostTs]), + Log = io_lib:format("download: ~p completed, cost time: ~p(ms)", [binary_to_list(TarUrl), CostTs]), efka_inetd_task_log:stash(TaskId, list_to_binary(Log)), %% 创建工作目录 diff --git a/apps/efka/src/efka_inetd_task_log.erl b/apps/efka/src/efka_inetd_task_log.erl index d8c1b20..9ce361b 100644 --- a/apps/efka/src/efka_inetd_task_log.erl +++ b/apps/efka/src/efka_inetd_task_log.erl @@ -35,7 +35,7 @@ stash(TaskId, Log) when is_integer(TaskId), is_binary(Log) -> stash(TaskId, Items) when is_integer(TaskId), is_list(Items) -> {{Y, M, D}, {H, I, S}} = calendar:local_time(), TimePrefix = iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])), - Log = iolist_to_binary([TimePrefix, <<" ">>, lists:join(<<" ">>, Items), <<$\n>>]), + Log = iolist_to_binary([TimePrefix, <<" ">>, lists:join(<<" ">>, Items)]), gen_server:cast(?SERVER, {stash, TaskId, Log}). -spec flush(TaskId :: integer()) -> no_return().