This commit is contained in:
anlicheng 2025-05-06 16:40:06 +08:00
parent 9d92fe98ab
commit bd9e3b6858
4 changed files with 25 additions and 30 deletions

View File

@ -217,7 +217,7 @@ handle_info({server_push_message, PacketId, <<?METHOD_ARGUMENTS:8, ArgumentsBin/
message = <<"服务未启动"/utf8>> message = <<"服务未启动"/utf8>>
}, },
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)); efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
{ok, ServicePid} -> ServicePid when is_pid(ServicePid) ->
case efka_micro_service:push_arguments(ServicePid, Args) of case efka_micro_service:push_arguments(ServicePid, Args) of
ok -> ok ->
Reply = #efka_response{ Reply = #efka_response{
@ -246,7 +246,7 @@ handle_info({server_push_message, PacketId, <<?METHOD_METRICS:8, MetricsBin/bina
message = <<"服务未启动"/utf8>> message = <<"服务未启动"/utf8>>
}, },
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)); efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
{ok, ServicePid} -> ServicePid when is_pid(ServicePid) ->
case efka_micro_service:push_metrics(ServicePid, Metrics) of case efka_micro_service:push_metrics(ServicePid, Metrics) of
ok -> ok ->
Reply = #efka_response { Reply = #efka_response {

View File

@ -46,7 +46,7 @@
%% API %% API
-export([start_link/3]). -export([start_link/3]).
-export([device_offline/1, device_online/1]). -export([device_offline/1, device_online/1]).
-export([send_metric_data/2, invoke_service/3, send_log/1, request_metric/0, request_param/0, send_event/2, controller_process/1]). -export([send_metric_data/2, send_log/1, request_metric/0, request_param/0, send_event/2, controller_process/1]).
-export([test/0]). -export([test/0]).
@ -54,7 +54,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, { -record(state, {
packet_id = 0 :: integer(), packet_id = 1 :: integer(),
host :: string(), host :: string(),
port :: integer(), port :: integer(),
%% %%
@ -75,12 +75,6 @@ send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) ->
{ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}), {ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}),
await_reply(Ref, ?EFKA_REQUEST_TIMEOUT). await_reply(Ref, ?EFKA_REQUEST_TIMEOUT).
-spec invoke_service(ToService :: binary(), Message :: map(), Timeout :: integer()) ->
{ok, Result :: any()} | {error, Reason :: any()}.
invoke_service(ToService, Message, Timeout) when is_binary(ToService), is_map(Message), is_integer(Timeout) ->
{ok, Ref} = gen_server:call(?MODULE, {invoke_service, self(), ToService, Message, Timeout}),
await_reply(Ref, ?EFKA_REQUEST_TIMEOUT).
-spec send_log(Message :: binary() | map()) -> no_return(). -spec send_log(Message :: binary() | map()) -> no_return().
send_log(Message) when is_binary(Message); is_map(Message) -> send_log(Message) when is_binary(Message); is_map(Message) ->
gen_server:cast(?MODULE, {send_log, Message}). gen_server:cast(?MODULE, {send_log, Message}).
@ -152,27 +146,18 @@ start_link(MicroServiceId, Host, Port) when is_binary(MicroServiceId), is_list(H
init([MicroServiceId, Host, Port]) -> init([MicroServiceId, Host, Port]) ->
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]),
ok = gen_tcp:controlling_process(Socket, self()), ok = gen_tcp:controlling_process(Socket, self()),
case do_register(MicroServiceId, Socket) of
ok ->
{ok, #state{packet_id = 1, host = Host, port = Port, socket = Socket}};
{error, Reason} ->
{stop, Reason}
end.
%% efka服务器的注册 Packet = <<1:32, ?PACKET_TYPE_REGISTER:8, MicroServiceId/binary>>,
do_register(MicroServiceId, Socket) ->
PacketId = 0,
Packet = <<PacketId:32, ?PACKET_TYPE_REGISTER:8, MicroServiceId/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
lager:debug("[efka_client] will send packet: ~p", [Packet]), lager:debug("[efka_client] will send packet: ~p", [Packet]),
receive receive
{tcp, Socket, <<PacketId:32, ?PACKET_TYPE_RESPONSE, 1:8>>} -> {tcp, Socket, <<1:32, ?PACKET_TYPE_RESPONSE, 1:8>>} ->
ok; {ok, #state{packet_id = 2, host = Host, port = Port, socket = Socket}};
{tcp, Socket, <<PacketId:32, ?PACKET_TYPE_RESPONSE, 0:8, Error/binary>>} -> {tcp, Socket, <<1:32, ?PACKET_TYPE_RESPONSE, 0:8, Error/binary>>} ->
{error, Error} {stop, Error}
after after
?EFKA_REQUEST_TIMEOUT -> ?EFKA_REQUEST_TIMEOUT ->
{error, timeout} {stop, register_timeout}
end. end.
%% @private %% @private

View File

@ -64,11 +64,11 @@ get_name(ServiceId) when is_binary(ServiceId) ->
get_pid(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) ->
whereis(get_name(ServiceId)). whereis(get_name(ServiceId)).
push_arguments(Pid, Args) -> push_arguments(Pid, Args) when is_pid(Pid), is_binary(Args) ->
ok. gen_server:call(Pid, {push_arguments, Args}).
push_metrics(Pid, Metrics) -> push_metrics(Pid, Metrics) when is_pid(Pid), is_binary(Metrics) ->
ok. gen_server:call(Pid, {push_metrics, Metrics}).
-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}. -spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}.
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
@ -143,6 +143,17 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol
{reply, {error, <<"serivce stopped">>}, State} {reply, {error, <<"serivce stopped">>}, State}
end; end;
%%
handle_call({push_arguments, Args}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) ->
case is_pid(ChannelPid) of
true ->
ok;
false ->
ok
end,
ok;
%% : %% :
handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) ->
{reply, {error, <<"service is running">>}, State}; {reply, {error, <<"service is running">>}, State};

View File

@ -52,7 +52,6 @@ init([]) ->
Spec1 = child_spec(#micro_service{ Spec1 = child_spec(#micro_service{
service_id = <<"test1234">>, service_id = <<"test1234">>,
service_name = <<"测试服务">>,
from = <<"master">>, from = <<"master">>,
%% %%
work_dir = <<"/usr/local/code/tmp/test/">>, work_dir = <<"/usr/local/code/tmp/test/">>,