fix efka_client

This commit is contained in:
anlicheng 2025-04-30 10:40:10 +08:00
parent 7a9d18cf51
commit 32161547d8

View File

@ -26,8 +26,6 @@
-define(PACKET_TYPE_REGISTER, 16).
%%
-define(PACKET_TYPE_METRIC_DATA, 3).
%%
-define(PACKET_TYPE_INVOKE, 4).
%%
-define(PACKET_TYPE_RESPONSE, 7).
%% efka下发给微服务参数
@ -42,8 +40,6 @@
-define(PACKET_TYPE_REQUEST_METRIC, 10).
%% efka获取自身的参数
-define(PACKET_TYPE_REQUEST_PARAM, 12).
%% efka向微服务发送stream-call消息
-define(PACKET_TYPE_PUSH_STREAM_CALL, 11).
%%
-define(PACKET_TYPE_EVENT, 15).
@ -63,9 +59,16 @@
port :: integer(),
%%
inflight = #{} :: map(),
socket :: gen_tcp:socket()
socket :: gen_tcp:socket(),
controller_process :: pid() | undefined
}).
test() ->
start_link(<<"test">>, "localhost", 18080).
controller_process(ControllerPid) when is_pid(ControllerPid) ->
gen_server:call(?MODULE, {controller_process, ControllerPid}).
-spec send_metric_data(Fields :: list(), Tags :: #{}) -> {ok, Result :: any()} | {error, Reason :: any()}.
send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) ->
{ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}),
@ -130,14 +133,11 @@ await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
{error, timeout}
end.
test() ->
start_link(<<"test">>, "localhost", 18080).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(RegisterName :: binary(), Host :: string(), Port :: integer()) ->
-spec(start_link(MicroServiceId :: binary(), Host :: string(), Port :: integer()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(RegisterName, Host, Port) when is_binary(RegisterName), is_list(Host), is_integer(Port) ->
gen_server:start_link(?MODULE, [RegisterName, Host, Port], []).
start_link(MicroServiceId, Host, Port) when is_binary(MicroServiceId), is_list(Host), is_integer(Port) ->
gen_server:start_link(?MODULE, [MicroServiceId, Host, Port], []).
%%%===================================================================
%%% gen_server callbacks
@ -148,10 +148,10 @@ start_link(RegisterName, Host, Port) when is_binary(RegisterName), is_list(Host)
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([RegisterName, Host, Port]) ->
init([MicroServiceId, Host, Port]) ->
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]),
ok = gen_tcp:controlling_process(Socket, self()),
case do_register(RegisterName, Socket) of
case do_register(MicroServiceId, Socket) of
ok ->
{ok, #state{packet_id = 1, host = Host, port = Port, socket = Socket}};
{error, Reason} ->
@ -159,24 +159,16 @@ init([RegisterName, Host, Port]) ->
end.
%% efka服务器的注册
do_register(RegisterName, Socket) ->
do_register(MicroServiceId, Socket) ->
PacketId = 0,
Body = #{<<"name">> => RegisterName},
Packet = pack(PacketId, ?PACKET_TYPE_REGISTER, Body),
Packet = <<PacketId:32, ?PACKET_TYPE_REGISTER:8, MicroServiceId/binary>>,
ok = gen_tcp:send(Socket, Packet),
lager:debug("[efka_client] will send packet: ~p", [Packet]),
receive
{tcp, Socket, Data} ->
RegisterPacket = unpack(Data),
lager:debug("[efka_client] get register reply packet: ~p", [RegisterPacket]),
case RegisterPacket of
#efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 1, <<"r">> := <<"ok">>}} ->
{tcp, Socket, <<PacketId:32, ?PACKET_TYPE_RESPONSE, 1:8>>} ->
ok;
#efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 0, <<"e">> := Error}} ->
{error, Error};
_ ->
{error, invalid_register_packet}
end
{tcp, Socket, <<PacketId:32, ?PACKET_TYPE_RESPONSE, 0:8, Error/binary>>} ->
{error, Error}
after
?EFKA_REQUEST_TIMEOUT ->
{error, timeout}
@ -192,33 +184,29 @@ do_register(RegisterName, Socket) ->
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Body = #{<<"data">> => Fields, <<"tag">> => Tags},
Packet = pack(PacketId, ?PACKET_TYPE_METRIC_DATA, Body),
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
%%
handle_call({controller_process, ControllerPid}, _From, State) ->
{reply, ok, State#state{controller_process = ControllerPid}};
handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]),
Packet = <<PacketId:32, ?PACKET_TYPE_METRIC_DATA, Body/binary>>,
handle_call({invoke_service, ReceiverPid, ToService, Message, Timeout}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Body = #{
<<"to">> => ToService,
<<"t">> => Timeout,
<<"m">> => Message
},
Packet = pack(PacketId, ?PACKET_TYPE_INVOKE, Body),
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_METRIC),
Packet = <<PacketId:32, ?PACKET_TYPE_REQUEST_METRIC:8>>,
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_PARAM),
Packet = <<PacketId:32, ?PACKET_TYPE_REQUEST_PARAM:8>>,
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}.
@ -228,20 +216,18 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket,
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({send_log, Message}, State = #state{socket = Socket, packet_id = PacketId}) ->
Body = #{<<"l">> => Message},
Packet = pack(PacketId, ?PACKET_TYPE_LOG, Body),
handle_cast({send_log, Message}, State = #state{socket = Socket}) ->
Body = jiffy:encode(#{<<"l">> => Message}, [force_utf8]),
Packet = <<0:32, ?PACKET_TYPE_LOG:8, Body/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State#state{packet_id = next_packet_id(PacketId)}};
{noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
Body = #{
<<"event_type">> => EventType,
<<"params">> => Params
},
Packet = pack(0, ?PACKET_TYPE_EVENT, Body),
Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]),
Packet = <<0:32, ?PACKET_TYPE_EVENT:8, Body/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
handle_cast(_Info, State = #state{}) ->
@ -253,99 +239,104 @@ handle_cast(_Info, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({tcp, Socket, Data}, State = #state{socket = Socket}) ->
Packet = unpack(Data),
self() ! {handle_packet, Packet},
{noreply, State};
handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) ->
{stop, tcp_closed, State};
%%
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = Message}}, State = #state{inflight = Inflight}) ->
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, Message/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
case maps:take(PacketId, Inflight) of
error ->
{noreply, State};
{{Ref, ReceiverPid}, NInflight} ->
case Message of
#{<<"c">> := 1, <<"r">> := Result} ->
<<1:8, Result/binary>> ->
ReceiverPid ! {response, Ref, {ok, Result}};
#{<<"c">> := 0, <<"e">> := Error} ->
<<0:8, Error/binary>> ->
ReceiverPid ! {response, Ref, {error, Error}}
end,
{noreply, State#state{inflight = NInflight}}
end;
%% efka推送的参数设置
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_PARAM, message = Params}},
State = #state{socket = Socket}) when is_map(Params) ->
Message = case handle_param(Params) of
ok ->
#{<<"c">> => 1, <<"r">> => <<"ok">>};
{error, Reason} ->
#{<<"c">> => 0, <<"e">> => Reason}
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_TYPE_PUSH_PARAM:8, Params/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) when is_map(Params) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true ->
Ref = make_ref(),
ControllerPid ! {push_param, Ref, Params},
receive
{push_param_reply, Ref, ok} ->
<<1:8>>;
{push_param_reply, Ref, {error, Reason}} when is_binary(Reason) ->
<<0:8, Reason/binary>>
after 5000 ->
<<0:8, "服务执行超时"/utf8>>
end;
false ->
<<0:8, "处理进程异常"/utf8>>
end,
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
Packet = <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, Message/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%% efka推送的采集项消息
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_METRIC, message = Metrics}},
State = #state{socket = Socket}) when is_list(Metrics) ->
Message = case handle_metric(Metrics) of
ok ->
#{<<"c">> => 1, <<"r">> => <<"ok">>};
{error, Reason} when is_binary(Reason) ->
#{<<"c">> => 0, <<"e">> => Reason}
handle_info({tcp, <<PacketId:32, ?PACKET_TYPE_PUSH_METRIC:8, Metrics/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) when is_list(Metrics) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true ->
Ref = make_ref(),
ControllerPid ! {push_metric, Ref, Metrics},
receive
{push_metric_reply, Ref, ok} ->
<<1:8>>;
{push_metric_reply, Ref, {error, Reason}} when is_binary(Reason) ->
<<0:8, Reason/binary>>
after 5000 ->
<<0:8, "服务执行超时"/utf8>>
end;
false ->
<<0:8, "处理进程异常"/utf8>>
end,
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
Packet = <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, Message/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%%
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_POLL, message = Command}}, State = #state{socket = Socket}) ->
Message = handle_poll_command(Command),
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
ok = gen_tcp:send(Socket, Packet),
%% #{
%% <<"c">> => 1,
%% <<"r">> => #{
%% <<"edge_status">> => Status,
%% <<"message">> => maps:get(Status, StatusMap)
%% }
%% }
{noreply, State};
%% efka的stream-call消息
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_STREAM_CALL,
message = Msg = #{<<"service_name">> := ServiceName, <<"data">> := Data, <<"tag">> := Tag}}}, State = #state{socket = Socket}) ->
Message = case handle_stream_call(ServiceName, Data, Tag) of
{continue, NewServiceName, NewData, NewTag} ->
#{
<<"c">> => 1,
<<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag},
<<"k">> => true
};
%%
{break, NewServiceName, NewData, NewTag} ->
#{
<<"c">> => 1,
<<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag},
<<"k">> => false
};
error ->
#{
<<"c">> => 0,
<<"r">> => Msg,
<<"k">> => true
}
handle_info({tcp, <<PacketId:32, ?PACKET_TYPE_POLL:8, Command/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true ->
Ref = make_ref(),
ControllerPid ! {poll, Ref, Command},
receive
{poll_reply, Ref, {ok, Reply}} when is_binary(Reply) ->
<<1:8, Reply/binary>>;
{poll_reply, Ref, {error, Reason}} when is_binary(Reason) ->
<<0:8, Reason/binary>>
after 5000 ->
<<0:8, "服务执行超时"/utf8>>
end;
false ->
<<0:8, "处理进程异常"/utf8>>
end,
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
Packet = <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, Message/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%%
handle_info({handle_packet, _Packet}, State = #state{}) ->
{noreply, State}.
handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) ->
lager:debug("[efka_client] get unknown packet: ~p", [Packet]),
{noreply, State};
handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) ->
{stop, tcp_closed, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
@ -370,68 +361,13 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
%% 32
-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer().
next_packet_id(PacketId) when PacketId >= 65535 ->
next_packet_id(PacketId) when PacketId >= 4294967295 ->
0;
next_packet_id(PacketId) ->
PacketId + 1.
-spec pack(PacketId :: integer(), Type :: integer()) -> binary().
pack(PacketId, Type) when is_integer(PacketId), is_integer(Type) ->
<<PacketId:16, Type:8>>.
-spec pack(PacketId :: integer(), Type :: integer(), Body :: map()) -> binary().
pack(PacketId, Type, Body) when is_integer(PacketId), is_integer(Type), is_map(Body) ->
Message = iolist_to_binary(jiffy:encode(Body, [force_utf8])),
<<PacketId:16, Type:8, Message/binary>>.
-spec unpack(binary()) -> #efka_packet{}.
unpack(<<PacketId:16, Type:8, Body/binary>>) ->
Message = catch jiffy:decode(Body, [return_maps]),
#efka_packet{packet_id = PacketId, type = Type, message = Message}.
%%%===================================================================
%%% simple callbacks
%%%===================================================================
handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query_status">>}) when is_binary(DeviceUUID) ->
case power_device:get_pid(DeviceUUID) of
undefined ->
#{
<<"c">> => 1,
<<"r">> => #{
<<"edge_status">> => -1,
<<"message">> => <<"设备信息不存在"/utf8>>
}
};
Pid ->
StatusMap = #{
0 => <<"离线"/utf8>>,
1 => <<"在线"/utf8>>
},
{ok, Status} = power_device:poll_status(Pid),
#{
<<"c">> => 1,
<<"r">> => #{
<<"edge_status">> => Status,
<<"message">> => maps:get(Status, StatusMap)
}
}
end.
-spec handle_param(Params :: map()) -> ok | {error, Reason :: binary()}.
handle_param(Params) when is_map(Params) ->
power_gateway_args:push_param(Params),
ok.
-spec handle_metric(Metric :: list()) -> ok | {error, Reason :: binary()}.
handle_metric(Metric) when is_list(Metric) ->
power_gateway_args:push_metric(Metric),
ok.
-spec handle_stream_call(ServiceName :: binary(), Fields :: list(), Tag :: map()) ->
{continue, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()}
| {break, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()}
| error.
handle_stream_call(ServiceName, Fields, Tag) when is_binary(ServiceName), is_list(Fields), is_map(Tag) ->
{continue, ServiceName, Fields, Tag}.