This commit is contained in:
anlicheng 2025-05-09 16:02:59 +08:00
parent fe87dabbb8
commit 41c1a7cf78
3 changed files with 15 additions and 13 deletions

View File

@ -49,6 +49,10 @@
metric_data(ServiceId, DeviceUUID, LineProtocolData) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(LineProtocolData) -> metric_data(ServiceId, DeviceUUID, LineProtocolData) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(LineProtocolData) ->
gen_server:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, LineProtocolData}). gen_server:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, LineProtocolData}).
-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_server:cast(?SERVER, {event, ServiceId, EventType, Params}).
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
@ -56,10 +60,6 @@ ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, Cp
feedback_phase(TaskId, Timestamp, Phase) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase) -> feedback_phase(TaskId, Timestamp, Phase) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase) ->
gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase}). gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase}).
-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_server:cast(?SERVER, {event, ServiceId, EventType, Params}).
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link() -> -spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}). {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
@ -236,13 +236,14 @@ handle_info({auth_reply, Reply}, State = #state{status = ?STATE_AUTH, transport_
handle_info({server_push, PacketId, <<?PUSH_DEPLOY:8, DeployBin/binary>>}, State = #state{transport_pid = TransportPid}) -> handle_info({server_push, PacketId, <<?PUSH_DEPLOY:8, DeployBin/binary>>}, State = #state{transport_pid = TransportPid}) ->
#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy), #deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy),
%% efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:deploy(TaskId, ServiceId, TarUrl) of Reply = case efka_inetd:deploy(TaskId, ServiceId, TarUrl) of
ok -> ok ->
#push_reply{code = 1, message = <<"">>}; #push_reply{code = 1, message = <<"">>};
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
#push_reply{code = 1, message = Reason} #push_reply{code = 0, message = Reason}
end, end,
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), is_pid(TransportPid) andalso efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State}; {noreply, State};
@ -252,13 +253,13 @@ handle_info({server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>
case efka_service:get_pid(ServiceId) of case efka_service:get_pid(ServiceId) of
undefined -> undefined ->
Reply = #push_reply{code = 0, message = <<"service not run">>, result = <<>>}, Reply = #push_reply{code = 0, message = <<"service not run">>},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), is_pid(TransportPid) andalso efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State}; {noreply, State};
ServicePid when is_pid(ServicePid) -> ServicePid when is_pid(ServicePid) ->
Ref = make_ref(), Ref = make_ref(),
%%
efka_service:push_config(ServicePid, Ref, ConfigJson), efka_service:push_config(ServicePid, Ref, ConfigJson),
%% %%
erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}), erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}),
@ -310,7 +311,7 @@ handle_info({server_pub, Topic, Content}, State = #state{status = ?STATE_ACTIVAT
{noreply, State}; {noreply, State};
%% efka_service的回复 %% efka_service的回复
handle_info({ems_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) -> handle_info({service_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) ->
case maps:take(Ref, Inflight) of case maps:take(Ref, Inflight) of
error -> error ->
{noreply, State}; {noreply, State};

View File

@ -227,7 +227,7 @@ handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{running_
efka_tcp_channel:push_config(ChannelPid, Ref, self(), ConfigJson), efka_tcp_channel:push_config(ChannelPid, Ref, self(), ConfigJson),
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
false -> false ->
ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}}, ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}},
{reply, State} {reply, State}
end; end;
@ -238,7 +238,7 @@ handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{running_status =
efka_tcp_channel:invoke(ChannelPid, Ref, self(), Payload), efka_tcp_channel:invoke(ChannelPid, Ref, self(), Payload),
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
false -> false ->
ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}}, ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}},
{reply, State} {reply, State}
end; end;
@ -275,7 +275,7 @@ handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight}) ->
error -> error ->
{noreply, State}; {noreply, State};
{ReceiverPid, NInflight} -> {ReceiverPid, NInflight} ->
ReceiverPid ! {ems_reply, Ref, Reply}, ReceiverPid ! {service_reply, Ref, Reply},
{noreply, State#state{inflight = NInflight}} {noreply, State#state{inflight = NInflight}}
end; end;

View File

@ -25,6 +25,7 @@ message Pub {
///// /////
message PushReply { message PushReply {
// 0: 1:
uint32 code = 1; uint32 code = 1;
string result = 2; string result = 2;
string message = 3; string message = 3;