This commit is contained in:
anlicheng 2025-06-03 19:37:03 +08:00
parent ad38e22d4a
commit ab2f53a7f7
5 changed files with 22 additions and 6 deletions

View File

@ -343,7 +343,7 @@ handle_event(info, {server_command, ?COMMAND_AUTH, <<Auth:8>>}, StateName, State
{next_state, ?STATE_RESTRICTED, State}
end;
%%
%% Pub/Sub机制
handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) ->
lager:debug("[efka_agent] get pub topic: ~p, content: ~p", [Topic, Content]),
%%

View File

@ -162,6 +162,7 @@ handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_
handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
efka_agent:event(ServiceId, EventType, Params),
lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]),
{noreply, State};
%%

View File

@ -100,9 +100,8 @@ handle_cast({subscribe, Topic, SubscriberPid}, State = #state{subscribers = Subs
{noreply, State}
end;
%%
%%
handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) ->
%% 广
MatchedSubscribers = match_subscribers(Subscribers, Topic),
lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) ->
SubscriberPid ! {topic_broadcast, Topic, Content}

View File

@ -37,6 +37,15 @@ init([]) ->
modules => ['efka_inetd_task_log']
},
#{
id => 'efka_subscription',
start => {'efka_subscription', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['efka_subscription']
},
#{
id => 'efka_inetd',
start => {'efka_inetd', start_link, []},

View File

@ -188,6 +188,11 @@ handle_info({topic_broadcast, Topic, Content}, State = #state{socket = Socket})
{noreply, State};
%% service进程关闭
handle_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{service_pid = ServicePid}) ->
lager:debug("[tcp_channel] service_pid: ~p, exited: ~p", [ServicePid, Reason]),
{stop, normal, State#state{service_pid = undefined}};
handle_info({tcp_error, Socket, Reason}, State = #state{socket = Socket, service_id = ServiceId}) ->
lager:debug("[tcp_channel] tcp_error: ~p, assoc service: ~p", [Reason, ServiceId]),
{stop, normal, State};
@ -225,12 +230,14 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> :=
Packet = json_error(Id, -1, <<"service not running">>),
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, Packet/binary>>),
{stop, normal, State};
Pid when is_pid(Pid) ->
case efka_service:attach_channel(Pid, self()) of
ServicePid when is_pid(ServicePid) ->
case efka_service:attach_channel(ServicePid, self()) of
ok ->
Packet = json_result(Id, <<"ok">>),
erlang:monitor(process, ServicePid),
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, Packet/binary>>),
{ok, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}};
{ok, State#state{service_id = ServiceId, service_pid = ServicePid, is_registered = true}};
{error, Error} ->
lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]),
Packet = json_error(Id, -1, Error),