diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index ae1dfb4..efa144d 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -16,21 +16,11 @@ %% 消息类型 %% 服务注册 --define(PACKET_REGISTER, 16#00). +-define(PACKET_REQUEST, 16#01). %% 消息响应 --define(PACKET_RESPONSE, 16#01). - +-define(PACKET_RESPONSE, 16#02). %% 上传数据 --define(PACKET_METRIC_DATA, 16#02). -%% 微服务事件上报 --define(PACKET_EVENT, 16#03). - -%% 微服务从efka获取自身的采集项 --define(PACKET_REQUEST_CONFIG, 16#04). - -%% efka下发给微服务配置 --define(PACKET_PUSH_CONFIG, 16#10). --define(PACKET_INVOKE, 16#11). +-define(PACKET_PUSH, 16#02). %% API -export([start_link/3]). @@ -73,9 +63,8 @@ request_config() -> {ok, Config}; {response, Ref, {error, Reason}} -> {error, Reason} - after - ?EFKA_REQUEST_TIMEOUT -> - {error, timeout} + after 5000 -> + {error, timeout} end. -spec device_offline(DeviceUUID :: binary()) -> no_return(). @@ -114,17 +103,21 @@ init([ServiceId, Host, Port]) -> ok = gen_tcp:controlling_process(Socket, self()), PacketId = 1, - Packet = <>, - ok = gen_tcp:send(Socket, Packet), + Register = #{<<"id">> => PacketId, <<"method">> => <<"register">>, <<"params">> => #{<<"service_id">> => ServiceId}}, + Packet = jiffy:encode(Register, [force_utf8]), + + ok = gen_tcp:send(Socket, <>), lager:debug("[efka_client] will send packet: ~p", [Packet]), receive - {tcp, Socket, <>} -> - {ok, #state{packet_id = PacketId + 1, host = Host, port = Port, socket = Socket}}; - {tcp, Socket, <>} -> - {stop, Error} - after - ?EFKA_REQUEST_TIMEOUT -> - {stop, register_timeout} + {tcp, Socket, <>} -> + case catch jiffy:decode(Data, [return_maps]) of + #{<<"id">> := PacketId, <<"result">> := <<"ok">>} -> + {ok, #state{packet_id = PacketId + 1, host = Host, port = Port, socket = Socket}}; + #{<<"id">> := PacketId, <<"error">> := #{<<"code">> := Code, <<"message">> := Error}} -> + {stop, {error, {Code, Error}}} + end + after 5000 -> + {stop, register_timeout} end. %% @private @@ -143,8 +136,9 @@ handle_call({controller_process, ControllerPid}, _From, State) -> %% done handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Packet = <>, - ok = gen_tcp:send(Socket, Packet), + RequestConfig = #{<<"id">> => PacketId, <<"method">> => <<"request_config">>}, + Packet = jiffy:encode(RequestConfig, [force_utf8]), + ok = gen_tcp:send(Socket, <>), Ref = make_ref(), {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}. @@ -160,17 +154,33 @@ handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = # %% 基于Line Protocol实现数据的传输 Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()), Body = efka_point:normalized(Point), - Len = byte_size(DeviceUUID), - Packet = <>, - ok = gen_tcp:send(Socket, Packet), + MetricData = #{ + <<"id">> => 0, + <<"method">> => <<"metric_data">>, + <<"params">> => #{ + <<"device_uuid">> => DeviceUUID, + <<"metric">> => Body + } + }, + Packet = jiffy:encode(MetricData, [force_utf8]), + ok = gen_tcp:send(Socket, <>), {noreply, State}; %% done -handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> - Packet = <>, - ok = gen_tcp:send(Socket, Packet), +handle_cast({send_event, EventType, Body}, State = #state{socket = Socket}) -> + Event = #{ + <<"id">> => 0, + <<"method">> => <<"event">>, + <<"params">> => #{ + <<"event_type">> => EventType, + <<"body">> => Body + } + }, + + Packet = jiffy:encode(Event, [force_utf8]), + ok = gen_tcp:send(Socket, <>), {noreply, State}; @@ -185,41 +195,61 @@ handle_cast(_Info, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). %% 收到请求的响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> - case maps:take(PacketId, Inflight) of - error -> - {noreply, State}; - {{Ref, ReceiverPid}, NInflight} -> - case Message of - <<1:8, Result/binary>> -> - ReceiverPid ! {response, Ref, {ok, Result}}; - <<0:8, Error/binary>> -> - ReceiverPid ! {response, Ref, {error, Error}} - end, - {noreply, State#state{inflight = NInflight}} +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> + case jiffy:decode(Packet, [return_maps]) of + #{<<"id">> := Id, <<"result">> := Result} -> + case maps:take(Id, Inflight) of + error -> + {noreply, State}; + {{Ref, ReceiverPid}, NInflight} -> + ReceiverPid ! {response, Ref, {ok, Result}}, + {noreply, State#state{inflight = NInflight}} + end; + #{<<"id">> := Id, <<"error">> := #{<<"code">> := Code, <<"message">> := Message}} -> + case maps:take(Id, Inflight) of + error -> + {noreply, State}; + {{Ref, ReceiverPid}, NInflight} -> + ReceiverPid ! {response, Ref, {error, {Code, Message}}}, + {noreply, State#state{inflight = NInflight}} + end end; %% 收到efka推送的参数设置 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> - Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of - true -> - Ref = make_ref(), - ControllerPid ! {push_config, Ref, ConfigJson}, - receive - {push_config_reply, Ref, ok} -> - <<1:8>>; - {push_config_reply, Ref, {error, Reason}} when is_binary(Reason) -> - <<0:8, Reason/binary>> - after 5000 -> - <<0:8, "服务执行超时"/utf8>> - end; - false -> - <<0:8, "处理进程异常"/utf8>> - end, - Packet = <>, - ok = gen_tcp:send(Socket, Packet), +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> + case jiffy:decode(Packet, [return_maps]) of + #{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := ConfigJson} -> + Ref = make_ref(), + ControllerPid ! {push_config, Ref, ConfigJson}, + Reply = + receive + {push_config_reply, Ref, ok} -> + #{<<"id">> => Id, <<"result">> => <<"ok">>}; + {push_config_reply, Ref, {error, Reason}} when is_binary(Reason) -> + #{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}} + after 5000 -> + #{<<"id">> => Id, <<"error">> => #{<<"code">> => -2, <<"message">> => <<"timeout">>}} + end, + Packet = jiffy:encode(Reply, [force_utf8]), + ok = gen_tcp:send(Socket, Packet), + {noreply, State}; - {noreply, State}; + #{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload, <<"timeout">> := Timeout}} -> + Ref = make_ref(), + ControllerPid ! {invoke, Ref, Payload}, + Reply = + receive + {invoke_reply, Ref, {ok, Result}} -> + #{<<"id">> => Id, <<"result">> => Result}; + {invoke_reply, Ref, {error, Reason}} when is_binary(Reason) -> + #{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}} + after Timeout * 1000 -> + #{<<"id">> => Id, <<"error">> => #{<<"code">> => -2, <<"message">> => <<"invoke timeout">>}} + end, + Packet = jiffy:encode(Reply, [force_utf8]), + ok = gen_tcp:send(Socket, Packet), + {noreply, State} + end; %% 其他消息为非法消息 handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) -> diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index f588ce6..7f8b222 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -145,7 +145,7 @@ handle_info({tcp, Socket, <>}, State = #s {noreply, State}; %% 数据项 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> efka_service:metric_data(ServicePid, DeviceUUID, Data), {noreply, State};