保存文件的配置信息
This commit is contained in:
parent
37e3040765
commit
935a2b25b7
@ -284,7 +284,7 @@ handle_info({server_push, PacketId, <<?PUSH_INVOKE:8, InvokeBin/binary>>}, State
|
||||
%% 消息发送到订阅系统
|
||||
case efka_service:get_pid(ServiceId) of
|
||||
undefined ->
|
||||
Reply = #async_call_reply{code = 0, message = <<"micro_service not run">>, result = <<>>},
|
||||
Reply = #async_call_reply{code = 0, message = <<"micro_service not run">>},
|
||||
safe_async_call_reply(PacketId, message_pb:encode_msg(Reply), State),
|
||||
{noreply, State};
|
||||
ServicePid when is_pid(ServicePid) ->
|
||||
|
||||
@ -33,7 +33,10 @@
|
||||
os_pid :: undefined | integer(),
|
||||
%% 配置信息
|
||||
manifest :: undefined | efka_manifest:manifest(),
|
||||
inflight = #{}
|
||||
inflight = #{},
|
||||
|
||||
%% 映射关系: #{Ref => Fun}
|
||||
callbacks = #{}
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
@ -162,11 +165,13 @@ handle_cast({send_event, EventType, Params}, State = #state{service_id = Service
|
||||
{noreply, State};
|
||||
|
||||
%% 推送配置项目
|
||||
handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{channel_pid = ChannelPid, inflight = Inflight}) ->
|
||||
handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{channel_pid = ChannelPid, service_id = ServiceId, inflight = Inflight, callbacks = Callbacks}) ->
|
||||
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
|
||||
true ->
|
||||
efka_tcp_channel:push_config(ChannelPid, Ref, self(), ConfigJson),
|
||||
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
|
||||
%% 设置成功,需要更新微服务的配置
|
||||
CB = fun() -> service_model:set_config(ServiceId, ConfigJson) end,
|
||||
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight), callbacks = maps:put(Ref, CB, Callbacks)}};
|
||||
false ->
|
||||
ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}},
|
||||
{noreply, State}
|
||||
@ -206,13 +211,14 @@ handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId,
|
||||
end;
|
||||
|
||||
%% 处理channel的回复
|
||||
handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight}) ->
|
||||
handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) ->
|
||||
case maps:take(Ref, Inflight) of
|
||||
error ->
|
||||
{noreply, State};
|
||||
{ReceiverPid, NInflight} ->
|
||||
ReceiverPid ! {service_reply, Ref, Reply},
|
||||
{noreply, State#state{inflight = NInflight}}
|
||||
|
||||
{noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}}
|
||||
end;
|
||||
|
||||
handle_info({Port, {data, Data}}, State = #state{service_id = ServiceId}) when is_port(Port) ->
|
||||
@ -271,4 +277,14 @@ kill_os_pid(OSPid) when is_integer(OSPid) ->
|
||||
|
||||
-spec try_reboot() -> no_return().
|
||||
try_reboot() ->
|
||||
erlang:start_timer(5000, self(), reboot_service).
|
||||
erlang:start_timer(5000, self(), reboot_service).
|
||||
|
||||
-spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map().
|
||||
trigger_callback(Ref, Callbacks) ->
|
||||
case maps:take(Ref, Callbacks) of
|
||||
error ->
|
||||
Callbacks;
|
||||
{Fun, NCallbacks} ->
|
||||
catch Fun(),
|
||||
NCallbacks
|
||||
end.
|
||||
Loading…
x
Reference in New Issue
Block a user