fix agent
This commit is contained in:
parent
0eb6eb1e7a
commit
556b1d797c
@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
-export([metric_data/4, event/3, ping/13, await_reply/2]).
|
-export([metric_data/4, event/3, ping/13]).
|
||||||
|
|
||||||
%% gen_statem callbacks
|
%% gen_statem callbacks
|
||||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||||
@ -34,11 +34,7 @@
|
|||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
transport_pid :: undefined | pid(),
|
transport_pid :: undefined | pid(),
|
||||||
transport_ref :: undefined | reference(),
|
transport_ref :: undefined | reference()
|
||||||
%% 服务器端推送的消息的未确认列表, 映射关系 #{Ref => PacketId}
|
|
||||||
push_inflight = #{},
|
|
||||||
%% 发送的请求的未确认列表, 映射关系 #{Ref => ReceiverPid}
|
|
||||||
request_inflight = #{}
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -57,16 +53,6 @@ event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventT
|
|||||||
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
|
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
|
||||||
gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
|
gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
|
||||||
|
|
||||||
%% 等待消息的回复
|
|
||||||
-spec await_reply(Ref :: reference(), Timeout :: timeout()) -> {ok, Reply :: binary()} | {error, timeout}.
|
|
||||||
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
|
|
||||||
receive
|
|
||||||
{request_reply, Ref, ReplyBin} ->
|
|
||||||
{ok, ReplyBin}
|
|
||||||
after Timeout ->
|
|
||||||
{error, timeout}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||||
%% initialize. To ensure a synchronized start-up procedure, this
|
%% initialize. To ensure a synchronized start-up procedure, this
|
||||||
%% function does not return until Module:init/1 has returned.
|
%% function does not return until Module:init/1 has returned.
|
||||||
@ -264,7 +250,7 @@ handle_event(info, {server_rpc, PacketId, <<?RPC_STOP_CONTAINER:8, ServiceId/bin
|
|||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% config.json配置信息
|
%% config.json配置信息
|
||||||
handle_event(info, {server_rpc, PacketId, <<?RPC_CONFIG_CONTAINER:8, ConfigBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) ->
|
handle_event(info, {server_rpc, PacketId, <<?RPC_CONFIG_CONTAINER:8, ConfigBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
#container_config{container_name = ContainerName, config = Config} = message_pb:decode_msg(ConfigBin, push_service_config),
|
#container_config{container_name = ContainerName, config = Config} = message_pb:decode_msg(ConfigBin, push_service_config),
|
||||||
|
|
||||||
case docker_manager:config_container(ContainerName, Config) of
|
case docker_manager:config_container(ContainerName, Config) of
|
||||||
@ -313,16 +299,6 @@ handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) ->
|
|||||||
efka_subscription:publish(Topic, Content),
|
efka_subscription:publish(Topic, Content),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% 收到来自服务器端的回复
|
|
||||||
handle_event(info, {server_reply, Ref, ReplyBin}, ?STATE_ACTIVATED, State = #state{request_inflight = RequestInflight}) ->
|
|
||||||
case maps:take(Ref, RequestInflight) of
|
|
||||||
error ->
|
|
||||||
{keep_state, State};
|
|
||||||
{ReceiverPid, NRequestInflight} ->
|
|
||||||
is_process_alive(ReceiverPid) andalso erlang:send(ReceiverPid, {request_reply, Ref, ReplyBin}),
|
|
||||||
{keep_state, State#state{push_inflight = NRequestInflight}}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% transport进程退出
|
%% transport进程退出
|
||||||
handle_event(info, {'DOWN', MRef, process, TransportPid, Reason}, _, State = #state{transport_ref = MRef}) ->
|
handle_event(info, {'DOWN', MRef, process, TransportPid, Reason}, _, State = #state{transport_ref = MRef}) ->
|
||||||
lager:debug("[efka_remote_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
|
lager:debug("[efka_remote_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user