diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index a925b21..422611f 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -45,15 +45,18 @@ %%%=================================================================== %% 发送数据 +-spec metric_data(ServiceId :: binary(), LineProtocolData :: binary()) -> no_return(). metric_data(ServiceId, LineProtocolData) when is_binary(ServiceId), is_binary(LineProtocolData) -> gen_server:cast(?SERVER, {metric_data, ServiceId, LineProtocolData}). ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). +-spec feedback_phase(TaskId :: integer(), Timestamp :: integer(), Phase :: binary(), Code :: integer()) -> no_return(). feedback_phase(TaskId, Timestamp, Phase, Code) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase), is_integer(Code) -> gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase, Code, <<"">>}). +-spec feedback_phase(TaskId :: integer(), Timestamp :: integer(), Phase :: binary(), Code :: integer(), Message :: binary()) -> no_return(). feedback_phase(TaskId, Timestamp, Phase, Code, Message) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase), is_integer(Code), is_binary(Message) -> gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase, Code, Message}). @@ -83,7 +86,7 @@ start_link() -> init([]) -> erlang:process_flag(trap_exit, true), erlang:start_timer(0, self(), create_transport), - {ok, #state{}}. + {ok, #state{status = ?STATE_DENIED}}. %% @private %% @doc Handling call messages @@ -112,17 +115,12 @@ handle_cast({metric_data, ServiceId, LineProtocolData}, State = #state{status = metric = LineProtocolData }), - case Status =:= ?STATE_ACTIVATED of + case Status =:= ?STATE_ACTIVATED andalso is_pid(TransportPid) of true -> efka_transport:send(TransportPid, ?METHOD_DATA, Packet); false -> - ok = micro_cache_model:insert(#micro_cache { - id = micro_cache_model:next_id(), - method = ?METHOD_DATA, - data = Packet - }) + ok = micro_cache_model:insert(?METHOD_DATA, Packet) end, - {noreply, State}; %% Event事件 @@ -137,11 +135,7 @@ handle_cast({event, ServiceId, EventType, Params}, State = #state{status = Statu true -> efka_transport:send(TransportPid, ?METHOD_EVENT, EventPacket); false -> - ok = micro_cache_model:insert(#micro_cache { - id = micro_cache_model:next_id(), - method = ?METHOD_EVENT, - data = EventPacket - }) + ok = micro_cache_model:insert(?METHOD_EVENT, EventPacket) end, {noreply, State}; @@ -158,11 +152,7 @@ handle_cast({ai_event, ServiceId, EventType, Params}, State = #state{status = St true -> efka_transport:send(TransportPid, ?METHOD_AI_EVENT, EventPacket); false -> - ok = micro_cache_model:insert(#micro_cache { - id = micro_cache_model:next_id(), - method = ?METHOD_AI_EVENT, - data = EventPacket - }) + ok = micro_cache_model:insert(?METHOD_AI_EVENT, EventPacket) end, {noreply, State}; @@ -180,11 +170,7 @@ handle_cast({feedback_phase, TaskId, Timestamp, Phase, Code, Message}, State = # true -> efka_transport:send(TransportPid, ?METHOD_PHASE, PhasePacket); false -> - ok = micro_cache_model:insert(#micro_cache { - id = micro_cache_model:next_id(), - method = ?METHOD_PHASE, - data = PhasePacket - }) + ok = micro_cache_model:insert(?METHOD_PHASE, PhasePacket) end, {noreply, State}; diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index b320b78..3b5e0e2 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -303,7 +303,7 @@ handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) %% 处理channel进程的退出 handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) -> lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]), - {noreply, State#state{channel_pid = undefined}}. + {noreply, State#state{channel_pid = undefined, inflight = #{}}}. %% @private %% @doc This function is called by a gen_server when it is about to diff --git a/apps/efka/src/mnesia/micro_cache_model.erl b/apps/efka/src/mnesia/micro_cache_model.erl index 672209d..92eab08 100644 --- a/apps/efka/src/mnesia/micro_cache_model.erl +++ b/apps/efka/src/mnesia/micro_cache_model.erl @@ -15,7 +15,7 @@ %% API -export([create_table/0]). --export([insert/1, get_all_cache/0, fetch_next/0, delete/1, next_id/0]). +-export([insert/2, get_all_cache/0, fetch_next/0, delete/1, next_id/0]). create_table() -> %% id生成器 @@ -29,8 +29,9 @@ create_table() -> next_id() -> id_generator_model:next_id(?TAB). --spec insert(MicroData0 :: #micro_cache{}) -> ok | {error, Reason :: any()}. -insert(Cache = #micro_cache{}) -> +-spec insert(Method :: integer(), Data :: binary()) -> ok | {error, Reason :: any()}. +insert(Method, Data) when is_integer(Method), is_binary(Data) -> + Cache = #micro_cache{id = next_id(), method = Method, data = Data}, case mnesia:transaction(fun() -> mnesia:write(?TAB, Cache, write) end) of {'atomic', ok} -> ok;