From 97cb06494327b523ab622d49ce29c87f963befe5 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 9 May 2025 11:40:19 +0800 Subject: [PATCH] fix --- apps/efka/src/client/efka_client.erl | 6 +- apps/efka/src/efka_tcp_channel.erl | 181 +++++++++++++++++---------- 2 files changed, 116 insertions(+), 71 deletions(-) diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index efa144d..ed4a635 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -20,7 +20,7 @@ %% 消息响应 -define(PACKET_RESPONSE, 16#02). %% 上传数据 --define(PACKET_PUSH, 16#02). +-define(PACKET_PUSH, 16#03). %% API -export([start_link/3]). @@ -218,7 +218,7 @@ handle_info({tcp, Socket, <>}, State = #state{s %% 收到efka推送的参数设置 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> case jiffy:decode(Packet, [return_maps]) of - #{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := ConfigJson} -> + #{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := #{<<"config">> := ConfigJson, <<"timeout">> := Timeout}} -> Ref = make_ref(), ControllerPid ! {push_config, Ref, ConfigJson}, Reply = @@ -227,7 +227,7 @@ handle_info({tcp, Socket, <>}, State = #state{socke #{<<"id">> => Id, <<"result">> => <<"ok">>}; {push_config_reply, Ref, {error, Reason}} when is_binary(Reason) -> #{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}} - after 5000 -> + after Timeout * 1000 -> #{<<"id">> => Id, <<"error">> => #{<<"code">> => -2, <<"message">> => <<"timeout">>}} end, Packet = jiffy:encode(Reply, [force_utf8]), diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index 7f8b222..ddfd8dd 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -23,21 +23,11 @@ %% 消息类型 %% 服务注册 --define(PACKET_REGISTER, 16#00). +-define(PACKET_REQUEST, 16#01). %% 消息响应 --define(PACKET_RESPONSE, 16#01). - +-define(PACKET_RESPONSE, 16#02). %% 上传数据 --define(PACKET_METRIC_DATA, 16#02). -%% 微服务事件上报 --define(PACKET_EVENT, 16#03). - -%% 微服务从efka获取自身的采集项 --define(PACKET_REQUEST_CONFIG, 16#04). - -%% efka下发给微服务配置 --define(PACKET_PUSH_CONFIG, 16#10). --define(PACKET_INVOKE, 16#11). +-define(PACKET_PUSH, 16#03). -record(state, { packet_id = 1, @@ -103,11 +93,17 @@ handle_call(_Request, _From, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). %% 推送配置项目 handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - ok = gen_tcp:send(Socket, <>), + PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson, <<"timeout">> => 5}}, + Packet = jiffy:encode(PushConfig, [force_utf8]), + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - ok = gen_tcp:send(Socket, <>), + PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload, <<"timeout">> => 5}}, + Packet = jiffy:encode(PushConfig, [force_utf8]), + + ok = gen_tcp:send(Socket, <>), {noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; handle_cast(_Request, State = #state{}) -> @@ -119,61 +115,22 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% 注册 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> - case efka_service:get_pid(ServiceId) of - undefined -> - lager:warning("[efka_tcp_channel] service_id: ~p, not running", [ServiceId]), - ok = gen_tcp:send(Socket, <>), - {stop, normal, State}; - Pid when is_pid(Pid) -> - case efka_service:attach_channel(Pid, self()) of - ok -> - ok = gen_tcp:send(Socket, <>), - {noreply, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}}; - {error, Error} -> - lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), - ok = gen_tcp:send(Socket, <>), - {stop, normal, State} - end +%% 处理client主动的请求 +handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> + Request = jiffy:decode(Data, [return_maps]), + case handle_request(Request, State) of + {ok, NewState} -> + {noreply, NewState}; + {stop, Reason, NewState} -> + {stop, Reason, NewState} end; -%% 请求参数 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - {ok, ConfigJson} = efka_service:request_config(ServicePid), - ok = gen_tcp:send(Socket, <>), - {noreply, State}; +%% 处理client的响应 +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> + Response = jiffy:decode(Data, [return_maps]), + NInflight = reply(Response, Inflight), -%% 数据项 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - efka_service:metric_data(ServicePid, DeviceUUID, Data), - {noreply, State}; - -%% Event事件 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - efka_service:send_event(ServicePid, EventType, Params), - {noreply, State}; - -%% 收到端上的响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> - case maps:take(PacketId, Inflight) of - error -> - lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Response, PacketId]), - {noreply, State}; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - case Response of - <<1:8, Result/binary>> -> - ReceiverPid ! {channel_reply, Ref, {ok, Result}}; - <<0:8, Error/binary>> -> - ReceiverPid ! {channel_reply, Ref, {error, Error}} - end; - false -> - lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Response, PacketId]) - end, - {noreply, State#state{inflight = NInflight}} - end; + {noreply, State#state{inflight = NInflight}}; handle_info(Info, State = #state{}) -> lager:debug("[tcp_channel] get info: ~p", [Info]), @@ -201,9 +158,97 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +%% 注册 +handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := #{<<"service_id">> := ServiceId}}, State = #state{socket = Socket}) -> + case efka_service:get_pid(ServiceId) of + undefined -> + lager:warning("[efka_tcp_channel] service_id: ~p, not running", [ServiceId]), + Packet = json_error(Id, -1, <<"service not running">>), + ok = gen_tcp:send(Socket, <>), + {stop, normal, State}; + Pid when is_pid(Pid) -> + case efka_service:attach_channel(Pid, self()) of + ok -> + Packet = json_result(Id, <<"ok">>), + ok = gen_tcp:send(Socket, <>), + {ok, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}}; + {error, Error} -> + lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), + Packet = json_error(Id, -1, Error), + ok = gen_tcp:send(Socket, <>), + {stop, normal, State} + end + end; + +%% 请求参数 +handle_request(#{<<"id">> := Id, <<"method">> := <<"request_config">>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + {ok, ConfigJson} = efka_service:request_config(ServicePid), + Packet = json_result(Id, ConfigJson), + ok = gen_tcp:send(Socket, <>), + {ok, State}; + +%% 数据项 +handle_request(#{<<"id">> := 0, <<"method">> := <<"metric_data">>, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> + efka_service:metric_data(ServicePid, DeviceUUID, Metric), + {ok, State}; + +%% Event事件 +handle_request(#{<<"id">> := 0, <<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) -> + efka_service:send_event(ServicePid, EventType, Body), + {ok, State}. + %% 采用32位编码 -spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). next_packet_id(PacketId) when PacketId >= 4294967295 -> 1; next_packet_id(PacketId) -> - PacketId + 1. \ No newline at end of file + PacketId + 1. + +-spec json_result(Id :: integer(), Result :: term()) -> binary(). +json_result(Id, Result) when is_integer(Id) -> + Response = #{ + <<"id">> => Id, + <<"result">> => Result + }, + jiffy:encode(Response, [force_utf8]). + +-spec json_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). +json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> + Response = #{ + <<"id">> => Id, + <<"error">> => #{ + <<"code">> => Code, + <<"message">> => Message + } + }, + jiffy:encode(Response, [force_utf8]). + +-spec reply(Resp :: map(), Inflight :: map()) -> NInflight :: map(). +reply(Resp = #{<<"id">> := Id, <<"result">> := Result}, Inflight) -> + case maps:take(Id, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), + Inflight; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, {ok, Result}}; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) + end, + NInflight + end; +reply(Resp = #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}}, Inflight) -> + case maps:take(Id, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), + Inflight; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, {error, Error}}; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) + end, + NInflight + end. \ No newline at end of file