This commit is contained in:
anlicheng 2025-05-07 14:07:04 +08:00
parent 143829d730
commit b511b19bbf
3 changed files with 14 additions and 27 deletions

View File

@ -45,15 +45,18 @@
%%%===================================================================
%%
-spec metric_data(ServiceId :: binary(), LineProtocolData :: binary()) -> no_return().
metric_data(ServiceId, LineProtocolData) when is_binary(ServiceId), is_binary(LineProtocolData) ->
gen_server:cast(?SERVER, {metric_data, ServiceId, LineProtocolData}).
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
-spec feedback_phase(TaskId :: integer(), Timestamp :: integer(), Phase :: binary(), Code :: integer()) -> no_return().
feedback_phase(TaskId, Timestamp, Phase, Code) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase), is_integer(Code) ->
gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase, Code, <<"">>}).
-spec feedback_phase(TaskId :: integer(), Timestamp :: integer(), Phase :: binary(), Code :: integer(), Message :: binary()) -> no_return().
feedback_phase(TaskId, Timestamp, Phase, Code, Message) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase), is_integer(Code), is_binary(Message) ->
gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase, Code, Message}).
@ -83,7 +86,7 @@ start_link() ->
init([]) ->
erlang:process_flag(trap_exit, true),
erlang:start_timer(0, self(), create_transport),
{ok, #state{}}.
{ok, #state{status = ?STATE_DENIED}}.
%% @private
%% @doc Handling call messages
@ -112,17 +115,12 @@ handle_cast({metric_data, ServiceId, LineProtocolData}, State = #state{status =
metric = LineProtocolData
}),
case Status =:= ?STATE_ACTIVATED of
case Status =:= ?STATE_ACTIVATED andalso is_pid(TransportPid) of
true ->
efka_transport:send(TransportPid, ?METHOD_DATA, Packet);
false ->
ok = micro_cache_model:insert(#micro_cache {
id = micro_cache_model:next_id(),
method = ?METHOD_DATA,
data = Packet
})
ok = micro_cache_model:insert(?METHOD_DATA, Packet)
end,
{noreply, State};
%% Event事件
@ -137,11 +135,7 @@ handle_cast({event, ServiceId, EventType, Params}, State = #state{status = Statu
true ->
efka_transport:send(TransportPid, ?METHOD_EVENT, EventPacket);
false ->
ok = micro_cache_model:insert(#micro_cache {
id = micro_cache_model:next_id(),
method = ?METHOD_EVENT,
data = EventPacket
})
ok = micro_cache_model:insert(?METHOD_EVENT, EventPacket)
end,
{noreply, State};
@ -158,11 +152,7 @@ handle_cast({ai_event, ServiceId, EventType, Params}, State = #state{status = St
true ->
efka_transport:send(TransportPid, ?METHOD_AI_EVENT, EventPacket);
false ->
ok = micro_cache_model:insert(#micro_cache {
id = micro_cache_model:next_id(),
method = ?METHOD_AI_EVENT,
data = EventPacket
})
ok = micro_cache_model:insert(?METHOD_AI_EVENT, EventPacket)
end,
{noreply, State};
@ -180,11 +170,7 @@ handle_cast({feedback_phase, TaskId, Timestamp, Phase, Code, Message}, State = #
true ->
efka_transport:send(TransportPid, ?METHOD_PHASE, PhasePacket);
false ->
ok = micro_cache_model:insert(#micro_cache {
id = micro_cache_model:next_id(),
method = ?METHOD_PHASE,
data = PhasePacket
})
ok = micro_cache_model:insert(?METHOD_PHASE, PhasePacket)
end,
{noreply, State};

View File

@ -303,7 +303,7 @@ handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId})
%% channel进程的退出
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) ->
lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]),
{noreply, State#state{channel_pid = undefined}}.
{noreply, State#state{channel_pid = undefined, inflight = #{}}}.
%% @private
%% @doc This function is called by a gen_server when it is about to

View File

@ -15,7 +15,7 @@
%% API
-export([create_table/0]).
-export([insert/1, get_all_cache/0, fetch_next/0, delete/1, next_id/0]).
-export([insert/2, get_all_cache/0, fetch_next/0, delete/1, next_id/0]).
create_table() ->
%% id生成器
@ -29,8 +29,9 @@ create_table() ->
next_id() ->
id_generator_model:next_id(?TAB).
-spec insert(MicroData0 :: #micro_cache{}) -> ok | {error, Reason :: any()}.
insert(Cache = #micro_cache{}) ->
-spec insert(Method :: integer(), Data :: binary()) -> ok | {error, Reason :: any()}.
insert(Method, Data) when is_integer(Method), is_binary(Data) ->
Cache = #micro_cache{id = next_id(), method = Method, data = Data},
case mnesia:transaction(fun() -> mnesia:write(?TAB, Cache, write) end) of
{'atomic', ok} ->
ok;