From bd9e3b68582d3a54631afe3480343e8587cebae5 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 6 May 2025 16:40:06 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_agent.erl | 4 +-- apps/efka/src/efka_client.erl | 31 ++++++------------------ apps/efka/src/efka_micro_service.erl | 19 ++++++++++++--- apps/efka/src/efka_micro_service_sup.erl | 1 - 4 files changed, 25 insertions(+), 30 deletions(-) diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 9f9657d..165555b 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -217,7 +217,7 @@ handle_info({server_push_message, PacketId, <> }, efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)); - {ok, ServicePid} -> + ServicePid when is_pid(ServicePid) -> case efka_micro_service:push_arguments(ServicePid, Args) of ok -> Reply = #efka_response{ @@ -246,7 +246,7 @@ handle_info({server_push_message, PacketId, <> }, efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)); - {ok, ServicePid} -> + ServicePid when is_pid(ServicePid) -> case efka_micro_service:push_metrics(ServicePid, Metrics) of ok -> Reply = #efka_response { diff --git a/apps/efka/src/efka_client.erl b/apps/efka/src/efka_client.erl index fa4b34e..b1cbf1a 100644 --- a/apps/efka/src/efka_client.erl +++ b/apps/efka/src/efka_client.erl @@ -46,7 +46,7 @@ %% API -export([start_link/3]). -export([device_offline/1, device_online/1]). --export([send_metric_data/2, invoke_service/3, send_log/1, request_metric/0, request_param/0, send_event/2, controller_process/1]). +-export([send_metric_data/2, send_log/1, request_metric/0, request_param/0, send_event/2, controller_process/1]). -export([test/0]). @@ -54,7 +54,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { - packet_id = 0 :: integer(), + packet_id = 1 :: integer(), host :: string(), port :: integer(), %% 请求后未完成的请求 @@ -75,12 +75,6 @@ send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) -> {ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}), await_reply(Ref, ?EFKA_REQUEST_TIMEOUT). --spec invoke_service(ToService :: binary(), Message :: map(), Timeout :: integer()) -> - {ok, Result :: any()} | {error, Reason :: any()}. -invoke_service(ToService, Message, Timeout) when is_binary(ToService), is_map(Message), is_integer(Timeout) -> - {ok, Ref} = gen_server:call(?MODULE, {invoke_service, self(), ToService, Message, Timeout}), - await_reply(Ref, ?EFKA_REQUEST_TIMEOUT). - -spec send_log(Message :: binary() | map()) -> no_return(). send_log(Message) when is_binary(Message); is_map(Message) -> gen_server:cast(?MODULE, {send_log, Message}). @@ -152,27 +146,18 @@ start_link(MicroServiceId, Host, Port) when is_binary(MicroServiceId), is_list(H init([MicroServiceId, Host, Port]) -> {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), ok = gen_tcp:controlling_process(Socket, self()), - case do_register(MicroServiceId, Socket) of - ok -> - {ok, #state{packet_id = 1, host = Host, port = Port, socket = Socket}}; - {error, Reason} -> - {stop, Reason} - end. -%% 执行到efka服务器的注册 -do_register(MicroServiceId, Socket) -> - PacketId = 0, - Packet = <>, + Packet = <<1:32, ?PACKET_TYPE_REGISTER:8, MicroServiceId/binary>>, ok = gen_tcp:send(Socket, Packet), lager:debug("[efka_client] will send packet: ~p", [Packet]), receive - {tcp, Socket, <>} -> - ok; - {tcp, Socket, <>} -> - {error, Error} + {tcp, Socket, <<1:32, ?PACKET_TYPE_RESPONSE, 1:8>>} -> + {ok, #state{packet_id = 2, host = Host, port = Port, socket = Socket}}; + {tcp, Socket, <<1:32, ?PACKET_TYPE_RESPONSE, 0:8, Error/binary>>} -> + {stop, Error} after ?EFKA_REQUEST_TIMEOUT -> - {error, timeout} + {stop, register_timeout} end. %% @private diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index a2c2b42..f88622b 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -64,11 +64,11 @@ get_name(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) -> whereis(get_name(ServiceId)). -push_arguments(Pid, Args) -> - ok. +push_arguments(Pid, Args) when is_pid(Pid), is_binary(Args) -> + gen_server:call(Pid, {push_arguments, Args}). -push_metrics(Pid, Metrics) -> - ok. +push_metrics(Pid, Metrics) when is_pid(Pid), is_binary(Metrics) -> + gen_server:call(Pid, {push_metrics, Metrics}). -spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}. attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> @@ -143,6 +143,17 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol {reply, {error, <<"serivce stopped">>}, State} end; +%% 推送配置项目 +handle_call({push_arguments, Args}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> + case is_pid(ChannelPid) of + true -> + ok; + false -> + ok + end, + + ok; + %% 启动服务: 当前服务如果正常运行,则不允许重启 handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> {reply, {error, <<"service is running">>}, State}; diff --git a/apps/efka/src/efka_micro_service_sup.erl b/apps/efka/src/efka_micro_service_sup.erl index 3639ba9..243d2f4 100644 --- a/apps/efka/src/efka_micro_service_sup.erl +++ b/apps/efka/src/efka_micro_service_sup.erl @@ -52,7 +52,6 @@ init([]) -> Spec1 = child_spec(#micro_service{ service_id = <<"test1234">>, - service_name = <<"测试服务">>, from = <<"master">>, %% 工作目录 work_dir = <<"/usr/local/code/tmp/test/">>,