From ad5f06be0e9b5aae1c91452f0e40041a8bc611a8 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 9 May 2025 00:06:47 +0800 Subject: [PATCH] fix --- apps/efka/src/client/efka_client.erl | 49 ++++++++++++---------------- apps/efka/src/efka_tcp_channel.erl | 28 ++++++++-------- 2 files changed, 35 insertions(+), 42 deletions(-) diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index 45201b1..ae1dfb4 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -16,20 +16,21 @@ %% 消息类型 %% 服务注册 --define(PACKET_REGISTER, 16). -%% 上传数据 --define(PACKET_METRIC_DATA, 3). +-define(PACKET_REGISTER, 16#00). %% 消息响应 --define(PACKET_RESPONSE, 7). +-define(PACKET_RESPONSE, 16#01). -%% efka下发给微服务配置 --define(PACKET_PUSH_CONFIG, 5). +%% 上传数据 +-define(PACKET_METRIC_DATA, 16#02). +%% 微服务事件上报 +-define(PACKET_EVENT, 16#03). %% 微服务从efka获取自身的采集项 --define(PACKET_REQUEST_CONFIG, 10). +-define(PACKET_REQUEST_CONFIG, 16#04). -%% 微服务事件上报 --define(PACKET_EVENT, 15). +%% efka下发给微服务配置 +-define(PACKET_PUSH_CONFIG, 16#10). +-define(PACKET_INVOKE, 16#11). %% API -export([start_link/3]). @@ -66,11 +67,15 @@ send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUI -spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}. request_config() -> {ok, Ref} = gen_server:call(?MODULE, {request_config, self()}), - case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of - {ok, Reply} -> - {ok, jiffy:decode(Reply, [return_maps])}; - Error -> - Error + receive + {response, Ref, {ok, Reply}} -> + Config = jiffy:decode(Reply, [return_maps]), + {ok, Config}; + {response, Ref, {error, Reason}} -> + {error, Reason} + after + ?EFKA_REQUEST_TIMEOUT -> + {error, timeout} end. -spec device_offline(DeviceUUID :: binary()) -> no_return(). @@ -89,18 +94,6 @@ send_event(EventType, Params) when is_integer(EventType), is_binary(Params) -> %%% API %%%=================================================================== --spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: any()} | {error, Reason :: any()}. -await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> - receive - {response, Ref, {ok, Reply}} -> - {ok, Reply}; - {response, Ref, {error, Reason}} -> - {error, Reason} - after - Timeout -> - {error, timeout} - end. - %% @doc Spawns the server and registers the local name (unique) -spec(start_link(ServiceId :: binary(), Host :: string(), Port :: integer()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -169,14 +162,14 @@ handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = # Body = efka_point:normalized(Point), Len = byte_size(DeviceUUID), - Packet = <>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; %% done handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> - Packet = <>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index 2dfec37..f588ce6 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -23,21 +23,21 @@ %% 消息类型 %% 服务注册 --define(PACKET_REGISTER, 16). -%% 上传数据 --define(PACKET_METRIC_DATA, 3). +-define(PACKET_REGISTER, 16#00). %% 消息响应 --define(PACKET_RESPONSE, 7). +-define(PACKET_RESPONSE, 16#01). + +%% 上传数据 +-define(PACKET_METRIC_DATA, 16#02). +%% 微服务事件上报 +-define(PACKET_EVENT, 16#03). + +%% 微服务从efka获取自身的采集项 +-define(PACKET_REQUEST_CONFIG, 16#04). %% efka下发给微服务配置 --define(PACKET_PUSH_CONFIG, 5). - --define(PACKET_INVOKE, 6). - --define(PACKET_REQUEST_CONFIG, 10). - -%% 微服务事件上报 --define(PACKET_EVENT, 15). +-define(PACKET_PUSH_CONFIG, 16#10). +-define(PACKET_INVOKE, 16#11). -record(state, { packet_id = 1, @@ -145,12 +145,12 @@ handle_info({tcp, Socket, <>}, State = #s {noreply, State}; %% 数据项 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> efka_service:metric_data(ServicePid, DeviceUUID, Data), {noreply, State}; %% Event事件 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> efka_service:send_event(ServicePid, EventType, Params), {noreply, State};