From a690925a2889623873c2bd9c59fbe25ec09c70ca Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 22 Sep 2025 15:44:21 +0800 Subject: [PATCH] fix efka --- apps/efka/src/ws_channel.erl | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/apps/efka/src/ws_channel.erl b/apps/efka/src/ws_channel.erl index c154bb8..51b816e 100644 --- a/apps/efka/src/ws_channel.erl +++ b/apps/efka/src/ws_channel.erl @@ -17,15 +17,6 @@ %% 最大的等待时间 -define(PENDING_TIMEOUT, 10 * 1000). -%% 服务注册 --define(PACKET_REQUEST, 16#01). -%% 消息响应 --define(PACKET_RESPONSE, 16#02). -%% 上传数据 --define(PACKET_PUSH, 16#03). - --define(PACKET_PUB, 16#04). - -record(state, { service_pid :: undefined | pid(), is_registered = false :: boolean() @@ -43,7 +34,10 @@ websocket_init(_State) -> %% 初始状态为true {ok, #state{}}. -websocket_handle({binary, <>}, State) -> +websocket_handle(ping, State) -> + {reply, pong, State}; + +websocket_handle({binary, Data}, State) -> Request = jiffy:decode(Data, [return_maps]), lager:debug("[ws_channle] get request: ~p", [Request]), handle_request(Request, State); @@ -54,18 +48,14 @@ websocket_handle(Info, State) -> %% 订阅的消息 websocket_info({topic_broadcast, Topic, Content}, State = #state{}) -> - Packet = jiffy:encode(#{<<"topic">> => Topic, <<"content">> => Content}, [force_utf8]), - Reply = <>, - {reply, {binary, Reply}, State}; + Req = jiffy:encode(#{<<"method">> => <<"publish">>, <<"params">> => #{<<"topic">> => Topic, <<"content">> => Content}}, [force_utf8]), + {reply, {binary, Req}, State}; %% service进程关闭 websocket_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{service_pid = ServicePid}) -> lager:debug("[ws_channel] container_pid: ~p, exited: ~p", [ServicePid, Reason]), {stop, State#state{service_pid = undefined}}; -websocket_info({timeout, _, {stop, Reason}}, State) -> - lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), - {stop, State}; %% 处理关闭信号 websocket_info({stop, Reason}, State) -> lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), @@ -93,7 +83,7 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := Reply = json_result(Id, <<"ok">>), erlang:monitor(process, ServicePid), - {reply, {binary, <>}, State#state{service_pid = ServicePid, is_registered = true}}; + {reply, {binary, Reply}, State#state{service_pid = ServicePid, is_registered = true}}; {error, Error} -> lager:warning("[ws_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), {stop, State} @@ -107,7 +97,7 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"subscribe">>, <<"params">> : {error, Reason} -> json_error(Id, -1, Reason) end, - {reply, {binary, <>}, State}; + {reply, {binary, Reply}, State}; %% 数据项 handle_request(#{<<"id">> := 0, <<"method">> := <<"metric_data">>,