From 32161547d8904d92a892c9f04812098eae100e36 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 30 Apr 2025 10:40:10 +0800 Subject: [PATCH] fix efka_client --- apps/efka/src/efka_client.erl | 274 +++++++++++++--------------------- 1 file changed, 105 insertions(+), 169 deletions(-) diff --git a/apps/efka/src/efka_client.erl b/apps/efka/src/efka_client.erl index 55eda12..7f33671 100644 --- a/apps/efka/src/efka_client.erl +++ b/apps/efka/src/efka_client.erl @@ -26,8 +26,6 @@ -define(PACKET_TYPE_REGISTER, 16). %% 上传数据 -define(PACKET_TYPE_METRIC_DATA, 3). -%% 调用其他微服务 --define(PACKET_TYPE_INVOKE, 4). %% 消息响应 -define(PACKET_TYPE_RESPONSE, 7). %% efka下发给微服务参数 @@ -42,8 +40,6 @@ -define(PACKET_TYPE_REQUEST_METRIC, 10). %% 微服务从efka获取自身的参数 -define(PACKET_TYPE_REQUEST_PARAM, 12). -%% efka向微服务发送stream-call消息 --define(PACKET_TYPE_PUSH_STREAM_CALL, 11). %% 微服务事件上报 -define(PACKET_TYPE_EVENT, 15). @@ -63,9 +59,16 @@ port :: integer(), %% 请求后未完成的请求 inflight = #{} :: map(), - socket :: gen_tcp:socket() + socket :: gen_tcp:socket(), + controller_process :: pid() | undefined }). +test() -> + start_link(<<"test">>, "localhost", 18080). + +controller_process(ControllerPid) when is_pid(ControllerPid) -> + gen_server:call(?MODULE, {controller_process, ControllerPid}). + -spec send_metric_data(Fields :: list(), Tags :: #{}) -> {ok, Result :: any()} | {error, Reason :: any()}. send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) -> {ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}), @@ -130,14 +133,11 @@ await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> {error, timeout} end. -test() -> - start_link(<<"test">>, "localhost", 18080). - %% @doc Spawns the server and registers the local name (unique) --spec(start_link(RegisterName :: binary(), Host :: string(), Port :: integer()) -> +-spec(start_link(MicroServiceId :: binary(), Host :: string(), Port :: integer()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(RegisterName, Host, Port) when is_binary(RegisterName), is_list(Host), is_integer(Port) -> - gen_server:start_link(?MODULE, [RegisterName, Host, Port], []). +start_link(MicroServiceId, Host, Port) when is_binary(MicroServiceId), is_list(Host), is_integer(Port) -> + gen_server:start_link(?MODULE, [MicroServiceId, Host, Port], []). %%%=================================================================== %%% gen_server callbacks @@ -148,10 +148,10 @@ start_link(RegisterName, Host, Port) when is_binary(RegisterName), is_list(Host) -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([RegisterName, Host, Port]) -> +init([MicroServiceId, Host, Port]) -> {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), ok = gen_tcp:controlling_process(Socket, self()), - case do_register(RegisterName, Socket) of + case do_register(MicroServiceId, Socket) of ok -> {ok, #state{packet_id = 1, host = Host, port = Port, socket = Socket}}; {error, Reason} -> @@ -159,24 +159,16 @@ init([RegisterName, Host, Port]) -> end. %% 执行到efka服务器的注册 -do_register(RegisterName, Socket) -> +do_register(MicroServiceId, Socket) -> PacketId = 0, - Body = #{<<"name">> => RegisterName}, - Packet = pack(PacketId, ?PACKET_TYPE_REGISTER, Body), + Packet = <>, ok = gen_tcp:send(Socket, Packet), lager:debug("[efka_client] will send packet: ~p", [Packet]), receive - {tcp, Socket, Data} -> - RegisterPacket = unpack(Data), - lager:debug("[efka_client] get register reply packet: ~p", [RegisterPacket]), - case RegisterPacket of - #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 1, <<"r">> := <<"ok">>}} -> - ok; - #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 0, <<"e">> := Error}} -> - {error, Error}; - _ -> - {error, invalid_register_packet} - end + {tcp, Socket, <>} -> + ok; + {tcp, Socket, <>} -> + {error, Error} after ?EFKA_REQUEST_TIMEOUT -> {error, timeout} @@ -192,33 +184,29 @@ do_register(RegisterName, Socket) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Body = #{<<"data">> => Fields, <<"tag">> => Tags}, - Packet = pack(PacketId, ?PACKET_TYPE_METRIC_DATA, Body), - ok = gen_tcp:send(Socket, Packet), - Ref = make_ref(), - {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; +%% 设置主动推送消息的接受进程 +handle_call({controller_process, ControllerPid}, _From, State) -> + {reply, ok, State#state{controller_process = ControllerPid}}; + +handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]), + Packet = <>, -handle_call({invoke_service, ReceiverPid, ToService, Message, Timeout}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Body = #{ - <<"to">> => ToService, - <<"t">> => Timeout, - <<"m">> => Message - }, - Packet = pack(PacketId, ?PACKET_TYPE_INVOKE, Body), ok = gen_tcp:send(Socket, Packet), Ref = make_ref(), {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_METRIC), + Packet = <>, ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_PARAM), + Packet = <>, ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}. @@ -228,20 +216,18 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({send_log, Message}, State = #state{socket = Socket, packet_id = PacketId}) -> - Body = #{<<"l">> => Message}, - Packet = pack(PacketId, ?PACKET_TYPE_LOG, Body), +handle_cast({send_log, Message}, State = #state{socket = Socket}) -> + Body = jiffy:encode(#{<<"l">> => Message}, [force_utf8]), + Packet = <<0:32, ?PACKET_TYPE_LOG:8, Body/binary>>, ok = gen_tcp:send(Socket, Packet), - {noreply, State#state{packet_id = next_packet_id(PacketId)}}; + {noreply, State}; handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> - Body = #{ - <<"event_type">> => EventType, - <<"params">> => Params - }, - Packet = pack(0, ?PACKET_TYPE_EVENT, Body), + Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]), + Packet = <<0:32, ?PACKET_TYPE_EVENT:8, Body/binary>>, ok = gen_tcp:send(Socket, Packet), + {noreply, State}; handle_cast(_Info, State = #state{}) -> @@ -253,99 +239,104 @@ handle_cast(_Info, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({tcp, Socket, Data}, State = #state{socket = Socket}) -> - Packet = unpack(Data), - self() ! {handle_packet, Packet}, - {noreply, State}; -handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) -> - {stop, tcp_closed, State}; %% 收到请求的响应 -handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = Message}}, State = #state{inflight = Inflight}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> case maps:take(PacketId, Inflight) of error -> {noreply, State}; {{Ref, ReceiverPid}, NInflight} -> case Message of - #{<<"c">> := 1, <<"r">> := Result} -> + <<1:8, Result/binary>> -> ReceiverPid ! {response, Ref, {ok, Result}}; - #{<<"c">> := 0, <<"e">> := Error} -> + <<0:8, Error/binary>> -> ReceiverPid ! {response, Ref, {error, Error}} end, {noreply, State#state{inflight = NInflight}} end; %% 收到efka推送的参数设置 -handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_PARAM, message = Params}}, - State = #state{socket = Socket}) when is_map(Params) -> - - Message = case handle_param(Params) of - ok -> - #{<<"c">> => 1, <<"r">> => <<"ok">>}; - {error, Reason} -> - #{<<"c">> => 0, <<"e">> => Reason} +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) when is_map(Params) -> + Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of + true -> + Ref = make_ref(), + ControllerPid ! {push_param, Ref, Params}, + receive + {push_param_reply, Ref, ok} -> + <<1:8>>; + {push_param_reply, Ref, {error, Reason}} when is_binary(Reason) -> + <<0:8, Reason/binary>> + after 5000 -> + <<0:8, "服务执行超时"/utf8>> + end; + false -> + <<0:8, "处理进程异常"/utf8>> end, - Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; %% 收到efka推送的采集项消息 -handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_METRIC, message = Metrics}}, - State = #state{socket = Socket}) when is_list(Metrics) -> - - Message = case handle_metric(Metrics) of - ok -> - #{<<"c">> => 1, <<"r">> => <<"ok">>}; - {error, Reason} when is_binary(Reason) -> - #{<<"c">> => 0, <<"e">> => Reason} +handle_info({tcp, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) when is_list(Metrics) -> + Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of + true -> + Ref = make_ref(), + ControllerPid ! {push_metric, Ref, Metrics}, + receive + {push_metric_reply, Ref, ok} -> + <<1:8>>; + {push_metric_reply, Ref, {error, Reason}} when is_binary(Reason) -> + <<0:8, Reason/binary>> + after 5000 -> + <<0:8, "服务执行超时"/utf8>> + end; + false -> + <<0:8, "处理进程异常"/utf8>> end, - Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; %% 收到设备状态的轮询请求 -handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_POLL, message = Command}}, State = #state{socket = Socket}) -> - Message = handle_poll_command(Command), - Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), - ok = gen_tcp:send(Socket, Packet), +%% #{ +%% <<"c">> => 1, +%% <<"r">> => #{ +%% <<"edge_status">> => Status, +%% <<"message">> => maps:get(Status, StatusMap) +%% } +%% } - {noreply, State}; - -%% 收到efka的stream-call消息 -handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_STREAM_CALL, - message = Msg = #{<<"service_name">> := ServiceName, <<"data">> := Data, <<"tag">> := Tag}}}, State = #state{socket = Socket}) -> - - Message = case handle_stream_call(ServiceName, Data, Tag) of - {continue, NewServiceName, NewData, NewTag} -> - #{ - <<"c">> => 1, - <<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag}, - <<"k">> => true - }; - %% 处理到当前节点为止,不继续往下传递 - {break, NewServiceName, NewData, NewTag} -> - #{ - <<"c">> => 1, - <<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag}, - <<"k">> => false - }; - error -> - #{ - <<"c">> => 0, - <<"r">> => Msg, - <<"k">> => true - } +handle_info({tcp, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> + Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of + true -> + Ref = make_ref(), + ControllerPid ! {poll, Ref, Command}, + receive + {poll_reply, Ref, {ok, Reply}} when is_binary(Reply) -> + <<1:8, Reply/binary>>; + {poll_reply, Ref, {error, Reason}} when is_binary(Reason) -> + <<0:8, Reason/binary>> + after 5000 -> + <<0:8, "服务执行超时"/utf8>> + end; + false -> + <<0:8, "处理进程异常"/utf8>> end, - Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; %% 其他消息为非法消息 -handle_info({handle_packet, _Packet}, State = #state{}) -> - {noreply, State}. +handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) -> + lager:debug("[efka_client] get unknown packet: ~p", [Packet]), + {noreply, State}; + +handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) -> + {stop, tcp_closed, State}. %% @private %% @doc This function is called by a gen_server when it is about to @@ -370,68 +361,13 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +%% 采用32位编码 -spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). -next_packet_id(PacketId) when PacketId >= 65535 -> +next_packet_id(PacketId) when PacketId >= 4294967295 -> 0; next_packet_id(PacketId) -> PacketId + 1. --spec pack(PacketId :: integer(), Type :: integer()) -> binary(). -pack(PacketId, Type) when is_integer(PacketId), is_integer(Type) -> - <>. - --spec pack(PacketId :: integer(), Type :: integer(), Body :: map()) -> binary(). -pack(PacketId, Type, Body) when is_integer(PacketId), is_integer(Type), is_map(Body) -> - Message = iolist_to_binary(jiffy:encode(Body, [force_utf8])), - <>. - --spec unpack(binary()) -> #efka_packet{}. -unpack(<>) -> - Message = catch jiffy:decode(Body, [return_maps]), - #efka_packet{packet_id = PacketId, type = Type, message = Message}. - %%%=================================================================== %%% simple callbacks -%%%=================================================================== - -handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query_status">>}) when is_binary(DeviceUUID) -> - case power_device:get_pid(DeviceUUID) of - undefined -> - #{ - <<"c">> => 1, - <<"r">> => #{ - <<"edge_status">> => -1, - <<"message">> => <<"设备信息不存在"/utf8>> - } - }; - Pid -> - StatusMap = #{ - 0 => <<"离线"/utf8>>, - 1 => <<"在线"/utf8>> - }, - {ok, Status} = power_device:poll_status(Pid), - #{ - <<"c">> => 1, - <<"r">> => #{ - <<"edge_status">> => Status, - <<"message">> => maps:get(Status, StatusMap) - } - } - end. - --spec handle_param(Params :: map()) -> ok | {error, Reason :: binary()}. -handle_param(Params) when is_map(Params) -> - power_gateway_args:push_param(Params), - ok. - --spec handle_metric(Metric :: list()) -> ok | {error, Reason :: binary()}. -handle_metric(Metric) when is_list(Metric) -> - power_gateway_args:push_metric(Metric), - ok. - --spec handle_stream_call(ServiceName :: binary(), Fields :: list(), Tag :: map()) -> - {continue, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()} - | {break, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()} - | error. -handle_stream_call(ServiceName, Fields, Tag) when is_binary(ServiceName), is_list(Fields), is_map(Tag) -> - {continue, ServiceName, Fields, Tag}. \ No newline at end of file +%%%=================================================================== \ No newline at end of file