This commit is contained in:
anlicheng 2025-05-07 14:24:12 +08:00
parent b511b19bbf
commit 6259eb20e7
2 changed files with 28 additions and 40 deletions

View File

@ -109,55 +109,39 @@ handle_call(_Request, _From, State = #state{}) ->
{stop, Reason :: term(), NewState :: #state{}}).
%%
handle_cast({metric_data, ServiceId, LineProtocolData}, State = #state{status = Status, transport_pid = TransportPid}) ->
handle_cast({metric_data, ServiceId, LineProtocolData}, State) ->
Packet = message_pb:encode_msg(#data{
service_id = ServiceId,
metric = LineProtocolData
}),
safe_send(?METHOD_DATA, Packet, State),
case Status =:= ?STATE_ACTIVATED andalso is_pid(TransportPid) of
true ->
efka_transport:send(TransportPid, ?METHOD_DATA, Packet);
false ->
ok = micro_cache_model:insert(?METHOD_DATA, Packet)
end,
{noreply, State};
%% Event事件
handle_cast({event, ServiceId, EventType, Params}, State = #state{status = Status, transport_pid = TransportPid}) ->
handle_cast({event, ServiceId, EventType, Params}, State) ->
EventPacket = message_pb:encode_msg(#event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
case Status =:= ?STATE_ACTIVATED of
true ->
efka_transport:send(TransportPid, ?METHOD_EVENT, EventPacket);
false ->
ok = micro_cache_model:insert(?METHOD_EVENT, EventPacket)
end,
safe_send(?METHOD_EVENT, EventPacket, State),
{noreply, State};
%% AiEvent事件
handle_cast({ai_event, ServiceId, EventType, Params}, State = #state{status = Status, transport_pid = TransportPid}) ->
handle_cast({ai_event, ServiceId, EventType, Params}, State) ->
EventPacket = message_pb:encode_msg(#ai_event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
case Status =:= ?STATE_ACTIVATED of
true ->
efka_transport:send(TransportPid, ?METHOD_AI_EVENT, EventPacket);
false ->
ok = micro_cache_model:insert(?METHOD_AI_EVENT, EventPacket)
end,
safe_send(?METHOD_AI_EVENT, EventPacket, State),
{noreply, State};
handle_cast({feedback_phase, TaskId, Timestamp, Phase, Code, Message}, State = #state{status = Status, transport_pid = TransportPid}) ->
handle_cast({feedback_phase, TaskId, Timestamp, Phase, Code, Message}, State) ->
PhasePacket = message_pb:encode_msg(#feedback_phase{
task_id = TaskId,
timestamp = Timestamp,
@ -166,12 +150,7 @@ handle_cast({feedback_phase, TaskId, Timestamp, Phase, Code, Message}, State = #
message = Message
}),
case Status =:= ?STATE_ACTIVATED of
true ->
efka_transport:send(TransportPid, ?METHOD_PHASE, PhasePacket);
false ->
ok = micro_cache_model:insert(?METHOD_PHASE, PhasePacket)
end,
safe_send(?METHOD_PHASE, PhasePacket, State),
{noreply, State};
@ -194,7 +173,12 @@ handle_cast({ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ip
memory = Memory,
interfaces = Interfaces
}),
Status =:= ?STATE_ACTIVATED andalso efka_transport:send(TransportPid, ?METHOD_PING, Ping),
case Status =:= ?STATE_ACTIVATED andalso is_pid(TransportPid) of
true ->
efka_transport:send(TransportPid, ?METHOD_PING, Ping);
false ->
ok
end,
{noreply, State};
@ -419,11 +403,13 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%
-spec safe_response(PacketId :: integer(), Reply :: binary(), State :: #state{}) -> no_return().
safe_response(PacketId, Reply, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_integer(PacketId), is_binary(Reply), is_pid(TransportPid) ->
case is_process_alive(TransportPid) of
true ->
efka_transport:response(TransportPid, PacketId, Reply);
false ->
ok
end;
is_process_alive(TransportPid) andalso efka_transport:response(TransportPid, PacketId, Reply);
safe_response(_PacketId, _Reply, #state{}) ->
ok.
ok.
%%
-spec safe_send(Method :: integer(), Packet :: binary(), State :: #state{}) -> no_return().
safe_send(Method, Packet, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_pid(TransportPid) ->
efka_transport:send(TransportPid, Method, Packet);
safe_send(Method, Packet, #state{}) ->
ok = micro_cache_model:insert(Method, Packet).

View File

@ -42,12 +42,15 @@ auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) ->
connect(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, connect).
-spec send(Pid :: pid(), Method :: integer(), Packet :: binary()) -> no_return().
send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) ->
gen_server:cast(Pid, {send, Method, Packet}).
response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId) ->
-spec response(Pid :: pid(), PacketId :: integer(), Response :: binary()) -> no_return().
response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) ->
gen_server:cast(Pid, {response, PacketId, Response}).
-spec stop(Pid :: pid()) -> ok.
stop(Pid) when is_pid(Pid) ->
gen_server:stop(Pid, normal, 2000).
@ -71,8 +74,7 @@ init([ParentPid, Host, Port]) ->
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |