diff --git a/apps/efka/include/efka.hrl b/apps/efka/include/efka.hrl index 348d40d..fcd8c1f 100644 --- a/apps/efka/include/efka.hrl +++ b/apps/efka/include/efka.hrl @@ -36,6 +36,7 @@ -define(METHOD_INFORM, 16#04). -define(METHOD_EVENT, 16#05). -define(METHOD_PHASE, 16#06). +-define(METHOD_REQUEST_SERVICE_CONFIG, 16#07). %%%% 命令类型子分类, 不需要返回值 diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 98da942..42afd69 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -17,6 +17,7 @@ %% API -export([start_link/0]). -export([metric_data/3, event/3, ping/13]). +-export([request_service_config/2, await_reply/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -56,6 +57,21 @@ event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventT ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). +%% 请求微服务的配置 +-spec request_service_config(ReceiverPid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: term()}. +request_service_config(ReceiverPid, ServiceId) when is_binary(ServiceId) -> + gen_server:call(?SERVER, {request_service_config, ReceiverPid, ServiceId}). + +%% 等待消息的回复 +-spec await_reply(Ref :: reference(), Timeout :: timeout()) -> {ok, Reply :: binary()} | {error, timeout}. +await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> + receive + {transport_reply, Ref, ReplyBin} -> + {ok, ReplyBin} + after Timeout -> + {error, timeout} + end. + %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -86,8 +102,14 @@ init([]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. +handle_call({request_service_config, ReceiverPid, ServiceId}, _From, State = #state{transport_pid = TransportPid}) -> + case is_pid(TransportPid) andalso is_process_alive(TransportPid) of + true -> + Ref = efka_transport:request(TransportPid, ReceiverPid, ?METHOD_REQUEST_SERVICE_CONFIG, ServiceId), + {reply, {ok, Ref}, State}; + false -> + {reply, {error, <<"transport is not alive">>}, State} + end. %% @private %% @doc Handling cast messages diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index abf5823..9795ab6 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -98,6 +98,19 @@ init([ServiceId]) -> lager:notice("[efka_service] service_id: ~p, not found", [ServiceId]), ignore; {ok, #service{root_dir = RootDir}} -> + %% 尝试更新微服务的配置信息 + case efka_agent:request_service_config(self(), ServiceId) of + {ok, Ref} -> + case efka_agent:await_reply(Ref, 5000) of + {ok, ConfigJson} -> + service_model:set_config(ServiceId, ConfigJson); + {error, Reason} -> + lager:debug("[efka_service] request_config get error: ~p", [Reason]) + end; + {error, Reason} -> + lager:debug("[efka_service] request_config get error: ~p", [Reason]) + end, + %% 第一次启动,要求必须成功;只有第一次启动成功,后续的重启逻辑才有意义 case efka_manifest:new(RootDir) of {ok, Manifest} -> diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index eac520a..f3cb40a 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -16,6 +16,7 @@ %% API -export([start_link/3]). -export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]). +-export([request/4]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -27,7 +28,9 @@ host :: string(), port :: integer(), socket :: undefined | ssl:sslsocket(), - packet_id = 1 + packet_id = 1, + %% 通过packet_id建立请求和响应的关系 + inflight = #{} }). %%%=================================================================== @@ -38,6 +41,12 @@ auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) -> gen_server:cast(Pid, {auth_request, AuthBin}). +-spec request(Pid :: pid(), ReceiverPid :: pid(), Method :: integer(), ReqBin :: binary()) -> Ref :: reference(). +request(Pid, ReceiverPid, Method, ReqBin) when is_pid(Pid), is_binary(ReqBin) -> + Ref = make_ref(), + gen_server:cast(Pid, {request, Ref, ReceiverPid, Method, ReqBin}), + Ref. + -spec connect(Pid :: pid()) -> no_return(). connect(Pid) when is_pid(Pid) -> gen_server:cast(Pid, connect). @@ -117,7 +126,7 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi ok = ssl:send(Socket, <>), %% 需要等待auth返回的结果 receive - {ssl, Socket, <>} -> + {ssl, Socket, <>} -> ParentPid ! {auth_reply, {ok, ReplyBin}}, {noreply, State#state{packet_id = PacketId + 1}}; {ssl, Socket, Info} -> @@ -129,6 +138,11 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi {noreply, State#state{packet_id = PacketId + 1}} end; +%% 提交请求 +handle_cast({request, Ref, ReceiverPid, Method, ReqBin}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + ok = ssl:send(Socket, <>), + {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; + handle_cast({send, Method, Packet}, State = #state{socket = Socket}) -> ok = ssl:send(Socket, <>), {noreply, State}; @@ -158,6 +172,21 @@ handle_info({ssl, Socket, < iot:response +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> + case maps:take(PacketId, Inflight) of + error -> + {noreply, State}; + {{Ref, ReceiverPid}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {transport_reply, Ref, ReplyBin}; + false -> + ok + end, + {noreply, State#state{inflight = NInflight}} + end; + handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> lager:debug("[efka_transport] ssl error: ~p", [Reason]), {stop, normal, State};