diff --git a/apps/efka/include/efka_tables.hrl b/apps/efka/include/efka_tables.hrl index 0f42a03..895d1b6 100644 --- a/apps/efka/include/efka_tables.hrl +++ b/apps/efka/include/efka_tables.hrl @@ -14,8 +14,8 @@ tar_url :: binary(), %% 工作目录 root_dir :: string(), - params :: binary(), - metrics :: binary(), + %% 配置信息 + config_json :: binary(), %% 状态: 0: 停止, 1: 运行中 status = 0 }). diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl deleted file mode 100644 index aefa022..0000000 --- a/apps/efka/src/client/efka_client.erl +++ /dev/null @@ -1,324 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 28. 8月 2023 15:39 -%%%------------------------------------------------------------------- --module(efka_client). --author("aresei"). - --behaviour(gen_server). - -%% 请求超时时间 --define(EFKA_REQUEST_TIMEOUT, 5000). - -%% 消息类型 - -%% 服务注册 --define(PACKET_REQUEST, 16#01). -%% 消息响应 --define(PACKET_RESPONSE, 16#02). -%% 上传数据 --define(PACKET_PUSH, 16#03). - --define(PACKET_PUB, 16#04). - -%% API --export([start_link/3]). --export([device_offline/1, device_online/1]). --export([send_metric_data/4, request_config/0, send_event/2, controller_process/1, subscribe/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --record(state, { - packet_id = 1 :: integer(), - host :: string(), - port :: integer(), - %% 请求后未完成的请求 - inflight = #{} :: map(), - socket :: gen_tcp:socket(), - controller_process :: pid() | undefined -}). - --spec controller_process(ControllerPid :: pid()) -> ok. -controller_process(ControllerPid) when is_pid(ControllerPid) -> - gen_server:call(?MODULE, {controller_process, ControllerPid}). - --spec send_metric_data(DeviceUUID :: binary(), Measurement :: binary(), Tags :: map(), Fields :: map()) -> no_return(). -send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUID), is_binary(Measurement), is_map(Fields), is_map(Tags) -> - gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}). - --spec subscribe(Topic :: binary()) -> no_return(). -subscribe(Topic) when is_binary(Topic) -> - gen_server:cast(?MODULE, {subscribe, Topic}). - -%% efka_server为了统一,r对象为字符串;需要2次json_decode --spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}. -request_config() -> - {ok, Ref} = gen_server:call(?MODULE, {request_config, self()}), - receive - {response, Ref, {ok, Reply}} -> - Config = jiffy:decode(Reply, [return_maps]), - {ok, Config}; - {response, Ref, {error, Reason}} -> - {error, Reason} - after 5000 -> - {error, timeout} - end. - --spec device_offline(DeviceUUID :: binary()) -> no_return(). -device_offline(DeviceUUID) when is_binary(DeviceUUID) -> - EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}, [force_utf8]), - send_event(1, EventBody). - --spec device_online(DeviceUUID :: binary()) -> no_return(). -device_online(DeviceUUID) when is_binary(DeviceUUID) -> - EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}, [force_utf8]), - send_event(1, EventBody). - --spec send_event(EventType :: integer(), Params :: binary()) -> no_return(). -send_event(EventType, Params) when is_integer(EventType), is_binary(Params) -> - gen_server:cast(?MODULE, {send_event, EventType, Params}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(ServiceId :: binary(), Host :: string(), Port :: integer()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(ServiceId, Host, Port) when is_binary(ServiceId), is_list(Host), is_integer(Port) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [ServiceId, Host, Port], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([ServiceId, Host, Port]) -> - {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), - ok = gen_tcp:controlling_process(Socket, self()), - - PacketId = 1, - Packet = jiffy:encode(#{ - <<"id">> => PacketId, - <<"method">> => <<"register">>, - <<"params">> => #{<<"service_id">> => ServiceId} - }, [force_utf8]), - - ok = gen_tcp:send(Socket, <>), - lager:debug("[efka_client] will send packet: ~p", [Packet]), - receive - {tcp, Socket, <>} -> - case catch jiffy:decode(Data, [return_maps]) of - #{<<"id">> := PacketId, <<"result">> := <<"ok">>} -> - {ok, #state{packet_id = PacketId + 1, host = Host, port = Port, socket = Socket}}; - #{<<"id">> := PacketId, <<"error">> := #{<<"code">> := Code, <<"message">> := Error}} -> - {stop, {error, {Code, Error}}} - end - after 5000 -> - {stop, register_timeout} - end. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -%% 设置主动推送消息的接受进程 -handle_call({controller_process, ControllerPid}, _From, State) -> - {reply, ok, State#state{controller_process = ControllerPid}}; - -%% done -handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Packet = jiffy:encode(#{<<"id">> => PacketId, <<"method">> => <<"request_config">>}, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - - erlang:start_timer(?EFKA_REQUEST_TIMEOUT, self(), {request_timeout, PacketId}), - - Ref = make_ref(), - {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% done -handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = #state{socket = Socket}) -> - %% 基于Line Protocol实现数据的传输 - Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()), - Body = efka_point:normalized(Point), - - Packet = jiffy:encode(#{ - <<"id">> => 0, - <<"method">> => <<"metric_data">>, - <<"params">> => #{ - <<"device_uuid">> => DeviceUUID, - <<"metric">> => Body - } - }, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - - {noreply, State}; - -%% done -handle_cast({send_event, EventType, Body}, State = #state{socket = Socket}) -> - Packet = jiffy:encode(#{ - <<"id">> => 0, - <<"method">> => <<"event">>, - <<"params">> => #{ - <<"event_type">> => EventType, - <<"body">> => Body - } - }, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - - {noreply, State}; - -handle_cast({subscribe, Topic}, State = #state{socket = Socket}) -> - Packet = jiffy:encode(#{ - <<"id">> => 0, - <<"method">> => <<"subscribe">>, - <<"params">> => #{ - <<"topic">> => Topic - } - }, [force_utf8]), - - ok = gen_tcp:send(Socket, <>), - {noreply, State}; - -handle_cast(_Info, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). - -%% 收到请求的响应, client主动向efka发送的异步请求的响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> - case jiffy:decode(Packet, [return_maps]) of - #{<<"id">> := Id, <<"result">> := Result} -> - case maps:take(Id, Inflight) of - error -> - {noreply, State}; - {{Ref, ReceiverPid}, NInflight} -> - ReceiverPid ! {response, Ref, {ok, Result}}, - {noreply, State#state{inflight = NInflight}} - end; - #{<<"id">> := Id, <<"error">> := #{<<"code">> := Code, <<"message">> := Message}} -> - case maps:take(Id, Inflight) of - error -> - {noreply, State}; - {{Ref, ReceiverPid}, NInflight} -> - ReceiverPid ! {response, Ref, {error, {Code, Message}}}, - {noreply, State#state{inflight = NInflight}} - end - end; - -%% 请求超时 -handle_info({timeout, _, {request_timeout, PacketId}}, State = #state{inflight = Inflight}) -> - case maps:take(PacketId, Inflight) of - error -> - {noreply, State}; - {{Ref, ReceiverPid}, NInflight} -> - ReceiverPid ! {response, Ref, {error, {-1, <<"request timeout">>}}}, - {noreply, State#state{inflight = NInflight}} - end; - -%% 收到efka推送的参数设置, 必须处理该消息;不设置超时 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> - case jiffy:decode(Packet, [return_maps]) of - #{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := #{<<"config">> := ConfigJson}} -> - Ref = make_ref(), - ControllerPid ! {push_config, self(), Ref, ConfigJson}, - Reply = - receive - {push_config_reply, Ref, ok} -> - #{<<"id">> => Id, <<"result">> => <<"ok">>}; - {push_config_reply, Ref, {error, Reason}} when is_binary(Reason) -> - #{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}} - end, - JsonReply = jiffy:encode(Reply, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - {noreply, State}; - - #{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload}} -> - Ref = make_ref(), - ControllerPid ! {invoke, self(), Ref, Payload}, - Reply = - receive - {invoke_reply, Ref, ok} -> - #{<<"id">> => Id, <<"result">> => <<"ok">>}; - {invoke_reply, Ref, {ok, Result}} -> - #{<<"id">> => Id, <<"result">> => Result}; - {invoke_reply, Ref, {error, Reason}} when is_binary(Reason) -> - #{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}} - end, - JsonReply = jiffy:encode(Reply, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - {noreply, State} - end; - -%% pub/sub的消息 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> - #{<<"topic">> := Topic, <<"content">> := Content} = jiffy:decode(Packet, [return_maps]), - ControllerPid ! {pub, Topic, Content}, - {noreply, State}; - -%% 其他消息为非法消息 -handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) -> - lager:debug("[efka_client] get unknown packet: ~p", [Packet]), - {noreply, State}; - -handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) -> - {stop, tcp_closed, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{socket = Socket}) -> - gen_tcp:close(Socket), - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% 采用32位编码 --spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). -next_packet_id(PacketId) when PacketId >= 4294967295 -> - 1; -next_packet_id(PacketId) -> - PacketId + 1. - -%%%=================================================================== -%%% simple callbacks -%%%=================================================================== \ No newline at end of file diff --git a/apps/efka/src/client/efka_client_broker.erl b/apps/efka/src/client/efka_client_broker.erl deleted file mode 100644 index 2c65e42..0000000 --- a/apps/efka/src/client/efka_client_broker.erl +++ /dev/null @@ -1,108 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 20. 5月 2025 15:09 -%%%------------------------------------------------------------------- --module(efka_client_broker). --author("anlicheng"). - --behaviour(gen_server). - -%% API --export([start_link/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --define(SERVER, ?MODULE). - --record(state, { - -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - efka_client:start_link(<<"service1234">>, "localhost", 18088), - efka_client:controller_process(self()), - {ok, #state{}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({push_config, ReceiverPid, Ref, Config}, State = #state{}) -> - lager:debug("[efka_client_broker] get push_config: ~p", [jiffy:decode(Config, [return_maps])]), - ReceiverPid ! {push_config_reply, Ref, ok}, - {noreply, State}; -handle_info({invoke, ReceiverPid, Ref, Payload}, State = #state{}) -> - lager:debug("[efka_client_broker] get invoke: ~p", [Payload]), - ReceiverPid ! {invoke_reply, Ref, {ok, <<"yes invoke me">>}}, - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/apps/efka/src/client/efka_point.erl b/apps/efka/src/client/efka_point.erl deleted file mode 100644 index 11947a0..0000000 --- a/apps/efka/src/client/efka_point.erl +++ /dev/null @@ -1,49 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 30. 5月 2023 11:28 -%%%------------------------------------------------------------------- --module(efka_point). --author("aresei"). - --record(point, { - measurement, - tags = [], - fields = [], - time = 0 :: integer() -}). - -%% API --export([new/4, normalized/1]). - --spec new(Measurement :: binary(), Tags :: map(), Fields :: map(), Timestamp :: integer()) -> #point{}. -new(Measurement, Tags, Fields, Timestamp) when is_binary(Measurement), is_map(Tags), is_map(Fields), is_integer(Timestamp) -> - #point{measurement = Measurement, tags = Tags, fields = Fields, time = Timestamp}. - --spec normalized(Point :: #point{}) -> binary(). -normalized(#point{measurement = Measurement, tags = Tags, fields = Fields, time = Time}) -> - NTags = lists:map(fun({N, V}) -> <> end, maps:to_list(Tags)), - NFields = lists:map(fun({K, V}) -> <> end, maps:to_list(Fields)), - - TagItems = lists:join(<<",">>, [Measurement | NTags]), - FieldItems = lists:join(<<",">>, NFields), - - erlang:iolist_to_binary([TagItems, <<" ">>, FieldItems, <<" ">>, integer_to_binary(Time)]). - -field_val(V) when is_integer(V) -> - integer_to_binary(V); -field_val(V) when is_float(V) -> - %% 默认按照浮点数表示 - S = float_to_list(V, [{decimals, 6}, compact]), - list_to_binary(S); -field_val(V) when is_binary(V) -> - <<$", V/binary, $">>; -field_val(true) -> - <<"true">>; -field_val(false) -> - <<"false">>; -field_val(_Other) -> - erlang:error(unsupported_type). \ No newline at end of file diff --git a/apps/efka/src/efka_inetd_task.erl b/apps/efka/src/efka_inetd_task.erl index 24e2276..3bb455a 100644 --- a/apps/efka/src/efka_inetd_task.erl +++ b/apps/efka/src/efka_inetd_task.erl @@ -131,8 +131,7 @@ do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is tar_url = TarUrl, %% 工作目录 root_dir = ServiceRootDir, - params = <<"">>, - metrics = <<"">>, + config_json = <<"">>, %% 状态: 0: 停止, 1: 运行中 status = 0 }), diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index 8c58075..79f58a3 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -124,10 +124,11 @@ init([ServiceId]) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). %% 绑定channel -handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid}) -> +handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service_id = ServiceId}) -> case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of false -> erlang:monitor(process, ChannelPid), + lager:debug("[efka_service] service_id: ~p, channel attched", [ServiceId]), {reply, ok, State#state{channel_pid = ChannelPid}}; true -> {reply, {error, <<"channel exists">>}, State} @@ -135,8 +136,12 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol %% 请求参数项 done handle_call(request_config, _From, State = #state{service_id = ServiceId}) -> - Params = service_model:get_params(ServiceId), - {reply, {ok, Params}, State}; + case service_model:get_config_json(ServiceId) of + {ok, ConfigJson} -> + {reply, {ok, ConfigJson}, State}; + error -> + {reply, {ok, <<>>}, State} + end; handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -148,6 +153,7 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_id = ServiceId}) -> + lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, metric data: ~p", [ServiceId, DeviceUUID, LineProtocolData]), efka_agent:metric_data(ServiceId, DeviceUUID, LineProtocolData), {noreply, State}; diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index 923c37a..c6db84e 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -82,6 +82,7 @@ init([]) -> modules => ['efka_service_sup'] } ], + {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index ddf45da..0c5b31d 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -73,7 +73,7 @@ start_link(Socket) -> {stop, Reason :: term()} | ignore). init([Socket]) -> ok = inet:setopts(Socket, [{active, true}]), - lager:debug("[tcp_channel] get new socket: ~p", [Socket]), + lager:debug("[efka_tcp_channel] get micro service socket: ~p", [Socket]), {ok, #state{socket = Socket}}. %% @private @@ -122,7 +122,7 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% 处理client主动的请求 +%% 处理micro-client:request => efka 主动的请求 handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> Request = jiffy:decode(Data, [return_maps]), case handle_request(Request, State) of @@ -132,7 +132,7 @@ handle_info({tcp, Socket, <>}, State = #state{so {stop, Reason, NewState} end; -%% 处理client的响应 +%% 处理micro-client:response => efka 的响应 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> Resp = jiffy:decode(Data, [return_maps]), case Resp of @@ -188,9 +188,12 @@ handle_info({topic_broadcast, Topic, Content}, State = #state{socket = Socket}) {noreply, State}; -handle_info(Info, State = #state{}) -> - lager:debug("[tcp_channel] get info: ~p", [Info]), - {noreply, State}. +handle_info({tcp_error, Socket, Reason}, State = #state{socket = Socket, service_id = ServiceId}) -> + lager:debug("[tcp_channel] tcp_error: ~p, assoc service: ~p", [Reason, ServiceId]), + {stop, normal, State}; +handle_info({tcp_closed, Socket}, State = #state{socket = Socket, service_id = ServiceId}) -> + lager:debug("[tcp_channel] tcp_closed: ~p, assoc service: ~p", [Socket, ServiceId]), + {stop, normal, State}. %% @private %% @doc This function is called by a gen_server when it is about to diff --git a/apps/efka/src/mnesia/service_model.erl b/apps/efka/src/mnesia/service_model.erl index 721d39b..d5f3fa7 100644 --- a/apps/efka/src/mnesia/service_model.erl +++ b/apps/efka/src/mnesia/service_model.erl @@ -16,7 +16,7 @@ %% API -export([create_table/0]). -export([insert/1, get_all_services/0, get_all_service_ids/0, get_running_services/0]). --export([get_metrics/1, get_params/1, set_metrics/2, set_params/2, get_service/1, get_status/1, change_status/2]). +-export([get_config_json/1, set_config/2, get_service/1, get_status/1, change_status/2]). -export([display_services/0]). create_table() -> @@ -52,14 +52,14 @@ change_status(ServiceId, NewStatus) when is_binary(ServiceId), is_integer(NewSta {error, Reason} end. --spec set_params(ServiceId :: binary(), Params :: binary()) -> ok | {error, Reason :: any()}. -set_params(ServiceId, Params) when is_binary(ServiceId), is_binary(Params) -> +-spec set_config(ServiceId :: binary(), ConfigJson :: binary()) -> ok | {error, Reason :: any()}. +set_config(ServiceId, ConfigJson) when is_binary(ServiceId), is_binary(ConfigJson) -> Fun = fun() -> case mnesia:read(?TAB, ServiceId, write) of [] -> mnesia:abort(<<"service not found">>); [S] -> - mnesia:write(?TAB, S#service{params = Params}, write) + mnesia:write(?TAB, S#service{config_json = ConfigJson}, write) end end, case mnesia:transaction(Fun) of @@ -69,39 +69,13 @@ set_params(ServiceId, Params) when is_binary(ServiceId), is_binary(Params) -> {error, Reason} end. --spec set_metrics(ServiceId :: binary(), Metrics :: binary()) -> ok | {error, Reason :: any()}. -set_metrics(ServiceId, Metrics) when is_binary(ServiceId), is_binary(Metrics) -> - Fun = fun() -> - case mnesia:read(?TAB, ServiceId, write) of - [] -> - mnesia:abort(<<"service not found">>); - [S] -> - mnesia:write(?TAB, S#service{metrics = Metrics}, write) - end - end, - case mnesia:transaction(Fun) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - --spec get_params(ServiceId :: binary()) -> Params :: binary(). -get_params(ServiceId) when is_binary(ServiceId) -> +-spec get_config_json(ServiceId :: binary()) -> error | {ok, ConfigJson :: binary()}. +get_config_json(ServiceId) when is_binary(ServiceId) -> case mnesia:dirty_read(?TAB, ServiceId) of [] -> - <<"">>; - [#service{params = Params}] -> - Params - end. - --spec get_metrics(ServiceId :: binary()) -> Params :: binary(). -get_metrics(ServiceId) when is_binary(ServiceId) -> - case mnesia:dirty_read(?TAB, ServiceId) of - [] -> - <<"">>; - [#service{metrics = Metrics}] -> - Metrics + error; + [#service{config_json = ConfigJson}] -> + {ok, ConfigJson} end. -spec get_status(ServiceId :: binary()) -> Status :: integer().