diff --git a/apps/efka/include/efka_service.hrl b/apps/efka/include/efka_service.hrl index 0df9fb3..33f7716 100644 --- a/apps/efka/include/efka_service.hrl +++ b/apps/efka/include/efka_service.hrl @@ -8,11 +8,3 @@ %%%------------------------------------------------------------------- -author("anlicheng"). -%% 服务注册 --define(PACKET_REQUEST, 16#01). -%% 消息响应 --define(PACKET_RESPONSE, 16#02). -%% 上传数据 --define(PACKET_PUSH, 16#03). - --define(PACKET_PUB, 16#04). \ No newline at end of file diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index f9290ec..2dd14d0 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -28,6 +28,15 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, ChildSpecs = [ + #{ + id => 'efka_service_sup', + start => {'efka_service_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['efka_service_sup'] + }, + #{ id => 'docker_events', start => {'docker_events', start_link, []}, diff --git a/apps/efka/src/ws_channel.erl b/apps/efka/src/ws_channel.erl index 2e30b98..c154bb8 100644 --- a/apps/efka/src/ws_channel.erl +++ b/apps/efka/src/ws_channel.erl @@ -17,6 +17,15 @@ %% 最大的等待时间 -define(PENDING_TIMEOUT, 10 * 1000). +%% 服务注册 +-define(PACKET_REQUEST, 16#01). +%% 消息响应 +-define(PACKET_RESPONSE, 16#02). +%% 上传数据 +-define(PACKET_PUSH, 16#03). + +-define(PACKET_PUB, 16#04). + -record(state, { service_pid :: undefined | pid(), is_registered = false :: boolean() @@ -36,6 +45,7 @@ websocket_init(_State) -> websocket_handle({binary, <>}, State) -> Request = jiffy:decode(Data, [return_maps]), + lager:debug("[ws_channle] get request: ~p", [Request]), handle_request(Request, State); websocket_handle(Info, State) -> @@ -50,7 +60,7 @@ websocket_info({topic_broadcast, Topic, Content}, State = #state{}) -> %% service进程关闭 websocket_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{service_pid = ServicePid}) -> - lager:debug("[tcp_channel] container_pid: ~p, exited: ~p", [ServicePid, Reason]), + lager:debug("[ws_channel] container_pid: ~p, exited: ~p", [ServicePid, Reason]), {stop, State#state{service_pid = undefined}}; websocket_info({timeout, _, {stop, Reason}}, State) -> @@ -102,12 +112,12 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"subscribe">>, <<"params">> : %% 数据项 handle_request(#{<<"id">> := 0, <<"method">> := <<"metric_data">>, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> - efka_container:metric_data(ServicePid, DeviceUUID, RouteKey, Metric), + efka_service:metric_data(ServicePid, DeviceUUID, RouteKey, Metric), {ok, State}; %% Event事件 handle_request(#{<<"id">> := 0, <<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) -> - efka_container:send_event(ServicePid, EventType, Body), + efka_service:send_event(ServicePid, EventType, Body), {ok, State}. -spec json_result(Id :: integer(), Result :: term()) -> binary().