完善微服务的逻辑
This commit is contained in:
parent
935a2b25b7
commit
58efa14707
@ -36,6 +36,7 @@
|
|||||||
-define(METHOD_INFORM, 16#04).
|
-define(METHOD_INFORM, 16#04).
|
||||||
-define(METHOD_EVENT, 16#05).
|
-define(METHOD_EVENT, 16#05).
|
||||||
-define(METHOD_PHASE, 16#06).
|
-define(METHOD_PHASE, 16#06).
|
||||||
|
-define(METHOD_REQUEST_SERVICE_CONFIG, 16#07).
|
||||||
|
|
||||||
%%%% 命令类型子分类, 不需要返回值
|
%%%% 命令类型子分类, 不需要返回值
|
||||||
|
|
||||||
|
|||||||
@ -17,6 +17,7 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
-export([metric_data/3, event/3, ping/13]).
|
-export([metric_data/3, event/3, ping/13]).
|
||||||
|
-export([request_service_config/2, await_reply/2]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
@ -56,6 +57,21 @@ event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventT
|
|||||||
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
|
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
|
||||||
gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
|
gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
|
||||||
|
|
||||||
|
%% 请求微服务的配置
|
||||||
|
-spec request_service_config(ReceiverPid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: term()}.
|
||||||
|
request_service_config(ReceiverPid, ServiceId) when is_binary(ServiceId) ->
|
||||||
|
gen_server:call(?SERVER, {request_service_config, ReceiverPid, ServiceId}).
|
||||||
|
|
||||||
|
%% 等待消息的回复
|
||||||
|
-spec await_reply(Ref :: reference(), Timeout :: timeout()) -> {ok, Reply :: binary()} | {error, timeout}.
|
||||||
|
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
|
||||||
|
receive
|
||||||
|
{transport_reply, Ref, ReplyBin} ->
|
||||||
|
{ok, ReplyBin}
|
||||||
|
after Timeout ->
|
||||||
|
{error, timeout}
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
-spec(start_link() ->
|
-spec(start_link() ->
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
@ -86,8 +102,14 @@ init([]) ->
|
|||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
handle_call({request_service_config, ReceiverPid, ServiceId}, _From, State = #state{transport_pid = TransportPid}) ->
|
||||||
{reply, ok, State}.
|
case is_pid(TransportPid) andalso is_process_alive(TransportPid) of
|
||||||
|
true ->
|
||||||
|
Ref = efka_transport:request(TransportPid, ReceiverPid, ?METHOD_REQUEST_SERVICE_CONFIG, ServiceId),
|
||||||
|
{reply, {ok, Ref}, State};
|
||||||
|
false ->
|
||||||
|
{reply, {error, <<"transport is not alive">>}, State}
|
||||||
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling cast messages
|
%% @doc Handling cast messages
|
||||||
|
|||||||
@ -98,6 +98,19 @@ init([ServiceId]) ->
|
|||||||
lager:notice("[efka_service] service_id: ~p, not found", [ServiceId]),
|
lager:notice("[efka_service] service_id: ~p, not found", [ServiceId]),
|
||||||
ignore;
|
ignore;
|
||||||
{ok, #service{root_dir = RootDir}} ->
|
{ok, #service{root_dir = RootDir}} ->
|
||||||
|
%% 尝试更新微服务的配置信息
|
||||||
|
case efka_agent:request_service_config(self(), ServiceId) of
|
||||||
|
{ok, Ref} ->
|
||||||
|
case efka_agent:await_reply(Ref, 5000) of
|
||||||
|
{ok, ConfigJson} ->
|
||||||
|
service_model:set_config(ServiceId, ConfigJson);
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:debug("[efka_service] request_config get error: ~p", [Reason])
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:debug("[efka_service] request_config get error: ~p", [Reason])
|
||||||
|
end,
|
||||||
|
|
||||||
%% 第一次启动,要求必须成功;只有第一次启动成功,后续的重启逻辑才有意义
|
%% 第一次启动,要求必须成功;只有第一次启动成功,后续的重启逻辑才有意义
|
||||||
case efka_manifest:new(RootDir) of
|
case efka_manifest:new(RootDir) of
|
||||||
{ok, Manifest} ->
|
{ok, Manifest} ->
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([start_link/3]).
|
-export([start_link/3]).
|
||||||
-export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]).
|
-export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]).
|
||||||
|
-export([request/4]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
@ -27,7 +28,9 @@
|
|||||||
host :: string(),
|
host :: string(),
|
||||||
port :: integer(),
|
port :: integer(),
|
||||||
socket :: undefined | ssl:sslsocket(),
|
socket :: undefined | ssl:sslsocket(),
|
||||||
packet_id = 1
|
packet_id = 1,
|
||||||
|
%% 通过packet_id建立请求和响应的关系
|
||||||
|
inflight = #{}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -38,6 +41,12 @@
|
|||||||
auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) ->
|
auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) ->
|
||||||
gen_server:cast(Pid, {auth_request, AuthBin}).
|
gen_server:cast(Pid, {auth_request, AuthBin}).
|
||||||
|
|
||||||
|
-spec request(Pid :: pid(), ReceiverPid :: pid(), Method :: integer(), ReqBin :: binary()) -> Ref :: reference().
|
||||||
|
request(Pid, ReceiverPid, Method, ReqBin) when is_pid(Pid), is_binary(ReqBin) ->
|
||||||
|
Ref = make_ref(),
|
||||||
|
gen_server:cast(Pid, {request, Ref, ReceiverPid, Method, ReqBin}),
|
||||||
|
Ref.
|
||||||
|
|
||||||
-spec connect(Pid :: pid()) -> no_return().
|
-spec connect(Pid :: pid()) -> no_return().
|
||||||
connect(Pid) when is_pid(Pid) ->
|
connect(Pid) when is_pid(Pid) ->
|
||||||
gen_server:cast(Pid, connect).
|
gen_server:cast(Pid, connect).
|
||||||
@ -117,7 +126,7 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi
|
|||||||
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH, AuthRequestBin/binary>>),
|
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH, AuthRequestBin/binary>>),
|
||||||
%% 需要等待auth返回的结果
|
%% 需要等待auth返回的结果
|
||||||
receive
|
receive
|
||||||
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} ->
|
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>} ->
|
||||||
ParentPid ! {auth_reply, {ok, ReplyBin}},
|
ParentPid ! {auth_reply, {ok, ReplyBin}},
|
||||||
{noreply, State#state{packet_id = PacketId + 1}};
|
{noreply, State#state{packet_id = PacketId + 1}};
|
||||||
{ssl, Socket, Info} ->
|
{ssl, Socket, Info} ->
|
||||||
@ -129,6 +138,11 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi
|
|||||||
{noreply, State#state{packet_id = PacketId + 1}}
|
{noreply, State#state{packet_id = PacketId + 1}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
%% 提交请求
|
||||||
|
handle_cast({request, Ref, ReceiverPid, Method, ReqBin}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||||
|
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, Method:8, ReqBin/binary>>),
|
||||||
|
{noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
|
||||||
|
|
||||||
handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
|
handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
|
||||||
ok = ssl:send(Socket, <<?PACKET_REQUEST, Method:8, Packet/binary>>),
|
ok = ssl:send(Socket, <<?PACKET_REQUEST, Method:8, Packet/binary>>),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
@ -158,6 +172,21 @@ handle_info({ssl, Socket, <<?PACKET_ASYNC_CALL, PacketId:32, AsyncCallBin/binary
|
|||||||
ParentPid ! {server_push, PacketId, AsyncCallBin},
|
ParentPid ! {server_push, PacketId, AsyncCallBin},
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
%% efka:request <-> iot:response
|
||||||
|
handle_info({ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
|
||||||
|
case maps:take(PacketId, Inflight) of
|
||||||
|
error ->
|
||||||
|
{noreply, State};
|
||||||
|
{{Ref, ReceiverPid}, NInflight} ->
|
||||||
|
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||||
|
true ->
|
||||||
|
ReceiverPid ! {transport_reply, Ref, ReplyBin};
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
{noreply, State#state{inflight = NInflight}}
|
||||||
|
end;
|
||||||
|
|
||||||
handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) ->
|
handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) ->
|
||||||
lager:debug("[efka_transport] ssl error: ~p", [Reason]),
|
lager:debug("[efka_transport] ssl error: ~p", [Reason]),
|
||||||
{stop, normal, State};
|
{stop, normal, State};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user