diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index a911efa..98da942 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -284,7 +284,7 @@ handle_info({server_push, PacketId, <>}, State %% 消息发送到订阅系统 case efka_service:get_pid(ServiceId) of undefined -> - Reply = #async_call_reply{code = 0, message = <<"micro_service not run">>, result = <<>>}, + Reply = #async_call_reply{code = 0, message = <<"micro_service not run">>}, safe_async_call_reply(PacketId, message_pb:encode_msg(Reply), State), {noreply, State}; ServicePid when is_pid(ServicePid) -> diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index 79f58a3..abf5823 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -33,7 +33,10 @@ os_pid :: undefined | integer(), %% 配置信息 manifest :: undefined | efka_manifest:manifest(), - inflight = #{} + inflight = #{}, + + %% 映射关系: #{Ref => Fun} + callbacks = #{} }). %%%=================================================================== @@ -162,11 +165,13 @@ handle_cast({send_event, EventType, Params}, State = #state{service_id = Service {noreply, State}; %% 推送配置项目 -handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{channel_pid = ChannelPid, inflight = Inflight}) -> +handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{channel_pid = ChannelPid, service_id = ServiceId, inflight = Inflight, callbacks = Callbacks}) -> case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of true -> efka_tcp_channel:push_config(ChannelPid, Ref, self(), ConfigJson), - {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; + %% 设置成功,需要更新微服务的配置 + CB = fun() -> service_model:set_config(ServiceId, ConfigJson) end, + {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight), callbacks = maps:put(Ref, CB, Callbacks)}}; false -> ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, {noreply, State} @@ -206,13 +211,14 @@ handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, end; %% 处理channel的回复 -handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight}) -> +handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) -> case maps:take(Ref, Inflight) of error -> {noreply, State}; {ReceiverPid, NInflight} -> ReceiverPid ! {service_reply, Ref, Reply}, - {noreply, State#state{inflight = NInflight}} + + {noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}} end; handle_info({Port, {data, Data}}, State = #state{service_id = ServiceId}) when is_port(Port) -> @@ -271,4 +277,14 @@ kill_os_pid(OSPid) when is_integer(OSPid) -> -spec try_reboot() -> no_return(). try_reboot() -> - erlang:start_timer(5000, self(), reboot_service). \ No newline at end of file + erlang:start_timer(5000, self(), reboot_service). + +-spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map(). +trigger_callback(Ref, Callbacks) -> + case maps:take(Ref, Callbacks) of + error -> + Callbacks; + {Fun, NCallbacks} -> + catch Fun(), + NCallbacks + end. \ No newline at end of file