diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 70f7714..c839dc1 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -343,7 +343,7 @@ handle_event(info, {server_command, ?COMMAND_AUTH, <>}, StateName, State {next_state, ?STATE_RESTRICTED, State} end; -%% 收到需要回复的指令 +%% 处理Pub/Sub机制 handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) -> lager:debug("[efka_agent] get pub topic: ~p, content: ~p", [Topic, Content]), %% 消息发送到订阅系统 diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index abf5823..b316d7f 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -162,6 +162,7 @@ handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_ handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) -> efka_agent:event(ServiceId, EventType, Params), + lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]), {noreply, State}; %% 推送配置项目 diff --git a/apps/efka/src/efka_subscription.erl b/apps/efka/src/efka_subscription.erl index 3dd0e85..9ea6e7c 100644 --- a/apps/efka/src/efka_subscription.erl +++ b/apps/efka/src/efka_subscription.erl @@ -100,9 +100,8 @@ handle_cast({subscribe, Topic, SubscriberPid}, State = #state{subscribers = Subs {noreply, State} end; -%% 不需要响应的推送 +%% 发布消息 handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) -> - %% 不需要回复的消息体采用广播的信息 MatchedSubscribers = match_subscribers(Subscribers, Topic), lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) -> SubscriberPid ! {topic_broadcast, Topic, Content} diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index c6db84e..212e443 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -37,6 +37,15 @@ init([]) -> modules => ['efka_inetd_task_log'] }, + #{ + id => 'efka_subscription', + start => {'efka_subscription', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['efka_subscription'] + }, + #{ id => 'efka_inetd', start => {'efka_inetd', start_link, []}, diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index 0c5b31d..6a7c7f6 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -188,6 +188,11 @@ handle_info({topic_broadcast, Topic, Content}, State = #state{socket = Socket}) {noreply, State}; +%% service进程关闭 +handle_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{service_pid = ServicePid}) -> + lager:debug("[tcp_channel] service_pid: ~p, exited: ~p", [ServicePid, Reason]), + {stop, normal, State#state{service_pid = undefined}}; + handle_info({tcp_error, Socket, Reason}, State = #state{socket = Socket, service_id = ServiceId}) -> lager:debug("[tcp_channel] tcp_error: ~p, assoc service: ~p", [Reason, ServiceId]), {stop, normal, State}; @@ -225,12 +230,14 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := Packet = json_error(Id, -1, <<"service not running">>), ok = gen_tcp:send(Socket, <>), {stop, normal, State}; - Pid when is_pid(Pid) -> - case efka_service:attach_channel(Pid, self()) of + ServicePid when is_pid(ServicePid) -> + case efka_service:attach_channel(ServicePid, self()) of ok -> Packet = json_result(Id, <<"ok">>), + erlang:monitor(process, ServicePid), + ok = gen_tcp:send(Socket, <>), - {ok, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}}; + {ok, State#state{service_id = ServiceId, service_pid = ServicePid, is_registered = true}}; {error, Error} -> lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), Packet = json_error(Id, -1, Error),