fix efka
This commit is contained in:
parent
a5b647e0e2
commit
37e3040765
@ -14,8 +14,8 @@
|
|||||||
tar_url :: binary(),
|
tar_url :: binary(),
|
||||||
%% 工作目录
|
%% 工作目录
|
||||||
root_dir :: string(),
|
root_dir :: string(),
|
||||||
params :: binary(),
|
%% 配置信息
|
||||||
metrics :: binary(),
|
config_json :: binary(),
|
||||||
%% 状态: 0: 停止, 1: 运行中
|
%% 状态: 0: 停止, 1: 运行中
|
||||||
status = 0
|
status = 0
|
||||||
}).
|
}).
|
||||||
|
|||||||
@ -1,324 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 28. 8月 2023 15:39
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(efka_client).
|
|
||||||
-author("aresei").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% 请求超时时间
|
|
||||||
-define(EFKA_REQUEST_TIMEOUT, 5000).
|
|
||||||
|
|
||||||
%% 消息类型
|
|
||||||
|
|
||||||
%% 服务注册
|
|
||||||
-define(PACKET_REQUEST, 16#01).
|
|
||||||
%% 消息响应
|
|
||||||
-define(PACKET_RESPONSE, 16#02).
|
|
||||||
%% 上传数据
|
|
||||||
-define(PACKET_PUSH, 16#03).
|
|
||||||
|
|
||||||
-define(PACKET_PUB, 16#04).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/3]).
|
|
||||||
-export([device_offline/1, device_online/1]).
|
|
||||||
-export([send_metric_data/4, request_config/0, send_event/2, controller_process/1, subscribe/1]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
packet_id = 1 :: integer(),
|
|
||||||
host :: string(),
|
|
||||||
port :: integer(),
|
|
||||||
%% 请求后未完成的请求
|
|
||||||
inflight = #{} :: map(),
|
|
||||||
socket :: gen_tcp:socket(),
|
|
||||||
controller_process :: pid() | undefined
|
|
||||||
}).
|
|
||||||
|
|
||||||
-spec controller_process(ControllerPid :: pid()) -> ok.
|
|
||||||
controller_process(ControllerPid) when is_pid(ControllerPid) ->
|
|
||||||
gen_server:call(?MODULE, {controller_process, ControllerPid}).
|
|
||||||
|
|
||||||
-spec send_metric_data(DeviceUUID :: binary(), Measurement :: binary(), Tags :: map(), Fields :: map()) -> no_return().
|
|
||||||
send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUID), is_binary(Measurement), is_map(Fields), is_map(Tags) ->
|
|
||||||
gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}).
|
|
||||||
|
|
||||||
-spec subscribe(Topic :: binary()) -> no_return().
|
|
||||||
subscribe(Topic) when is_binary(Topic) ->
|
|
||||||
gen_server:cast(?MODULE, {subscribe, Topic}).
|
|
||||||
|
|
||||||
%% efka_server为了统一,r对象为字符串;需要2次json_decode
|
|
||||||
-spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}.
|
|
||||||
request_config() ->
|
|
||||||
{ok, Ref} = gen_server:call(?MODULE, {request_config, self()}),
|
|
||||||
receive
|
|
||||||
{response, Ref, {ok, Reply}} ->
|
|
||||||
Config = jiffy:decode(Reply, [return_maps]),
|
|
||||||
{ok, Config};
|
|
||||||
{response, Ref, {error, Reason}} ->
|
|
||||||
{error, Reason}
|
|
||||||
after 5000 ->
|
|
||||||
{error, timeout}
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec device_offline(DeviceUUID :: binary()) -> no_return().
|
|
||||||
device_offline(DeviceUUID) when is_binary(DeviceUUID) ->
|
|
||||||
EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}, [force_utf8]),
|
|
||||||
send_event(1, EventBody).
|
|
||||||
|
|
||||||
-spec device_online(DeviceUUID :: binary()) -> no_return().
|
|
||||||
device_online(DeviceUUID) when is_binary(DeviceUUID) ->
|
|
||||||
EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}, [force_utf8]),
|
|
||||||
send_event(1, EventBody).
|
|
||||||
|
|
||||||
-spec send_event(EventType :: integer(), Params :: binary()) -> no_return().
|
|
||||||
send_event(EventType, Params) when is_integer(EventType), is_binary(Params) ->
|
|
||||||
gen_server:cast(?MODULE, {send_event, EventType, Params}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% API
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_link(ServiceId :: binary(), Host :: string(), Port :: integer()) ->
|
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
|
||||||
start_link(ServiceId, Host, Port) when is_binary(ServiceId), is_list(Host), is_integer(Port) ->
|
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [ServiceId, Host, Port], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Initializes the server
|
|
||||||
-spec(init(Args :: term()) ->
|
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([ServiceId, Host, Port]) ->
|
|
||||||
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]),
|
|
||||||
ok = gen_tcp:controlling_process(Socket, self()),
|
|
||||||
|
|
||||||
PacketId = 1,
|
|
||||||
Packet = jiffy:encode(#{
|
|
||||||
<<"id">> => PacketId,
|
|
||||||
<<"method">> => <<"register">>,
|
|
||||||
<<"params">> => #{<<"service_id">> => ServiceId}
|
|
||||||
}, [force_utf8]),
|
|
||||||
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
|
||||||
lager:debug("[efka_client] will send packet: ~p", [Packet]),
|
|
||||||
receive
|
|
||||||
{tcp, Socket, <<?PACKET_RESPONSE, Data/binary>>} ->
|
|
||||||
case catch jiffy:decode(Data, [return_maps]) of
|
|
||||||
#{<<"id">> := PacketId, <<"result">> := <<"ok">>} ->
|
|
||||||
{ok, #state{packet_id = PacketId + 1, host = Host, port = Port, socket = Socket}};
|
|
||||||
#{<<"id">> := PacketId, <<"error">> := #{<<"code">> := Code, <<"message">> := Error}} ->
|
|
||||||
{stop, {error, {Code, Error}}}
|
|
||||||
end
|
|
||||||
after 5000 ->
|
|
||||||
{stop, register_timeout}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling call messages
|
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
%% 设置主动推送消息的接受进程
|
|
||||||
handle_call({controller_process, ControllerPid}, _From, State) ->
|
|
||||||
{reply, ok, State#state{controller_process = ControllerPid}};
|
|
||||||
|
|
||||||
%% done
|
|
||||||
handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
|
||||||
Packet = jiffy:encode(#{<<"id">> => PacketId, <<"method">> => <<"request_config">>}, [force_utf8]),
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
|
||||||
|
|
||||||
erlang:start_timer(?EFKA_REQUEST_TIMEOUT, self(), {request_timeout, PacketId}),
|
|
||||||
|
|
||||||
Ref = make_ref(),
|
|
||||||
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
%% done
|
|
||||||
handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = #state{socket = Socket}) ->
|
|
||||||
%% 基于Line Protocol实现数据的传输
|
|
||||||
Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()),
|
|
||||||
Body = efka_point:normalized(Point),
|
|
||||||
|
|
||||||
Packet = jiffy:encode(#{
|
|
||||||
<<"id">> => 0,
|
|
||||||
<<"method">> => <<"metric_data">>,
|
|
||||||
<<"params">> => #{
|
|
||||||
<<"device_uuid">> => DeviceUUID,
|
|
||||||
<<"metric">> => Body
|
|
||||||
}
|
|
||||||
}, [force_utf8]),
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
|
||||||
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
%% done
|
|
||||||
handle_cast({send_event, EventType, Body}, State = #state{socket = Socket}) ->
|
|
||||||
Packet = jiffy:encode(#{
|
|
||||||
<<"id">> => 0,
|
|
||||||
<<"method">> => <<"event">>,
|
|
||||||
<<"params">> => #{
|
|
||||||
<<"event_type">> => EventType,
|
|
||||||
<<"body">> => Body
|
|
||||||
}
|
|
||||||
}, [force_utf8]),
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
|
||||||
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast({subscribe, Topic}, State = #state{socket = Socket}) ->
|
|
||||||
Packet = jiffy:encode(#{
|
|
||||||
<<"id">> => 0,
|
|
||||||
<<"method">> => <<"subscribe">>,
|
|
||||||
<<"params">> => #{
|
|
||||||
<<"topic">> => Topic
|
|
||||||
}
|
|
||||||
}, [force_utf8]),
|
|
||||||
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast(_Info, State = #state{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling all non call/cast messages
|
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
|
|
||||||
%% 收到请求的响应, client主动向efka发送的异步请求的响应
|
|
||||||
handle_info({tcp, Socket, <<?PACKET_RESPONSE, Packet/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
|
|
||||||
case jiffy:decode(Packet, [return_maps]) of
|
|
||||||
#{<<"id">> := Id, <<"result">> := Result} ->
|
|
||||||
case maps:take(Id, Inflight) of
|
|
||||||
error ->
|
|
||||||
{noreply, State};
|
|
||||||
{{Ref, ReceiverPid}, NInflight} ->
|
|
||||||
ReceiverPid ! {response, Ref, {ok, Result}},
|
|
||||||
{noreply, State#state{inflight = NInflight}}
|
|
||||||
end;
|
|
||||||
#{<<"id">> := Id, <<"error">> := #{<<"code">> := Code, <<"message">> := Message}} ->
|
|
||||||
case maps:take(Id, Inflight) of
|
|
||||||
error ->
|
|
||||||
{noreply, State};
|
|
||||||
{{Ref, ReceiverPid}, NInflight} ->
|
|
||||||
ReceiverPid ! {response, Ref, {error, {Code, Message}}},
|
|
||||||
{noreply, State#state{inflight = NInflight}}
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% 请求超时
|
|
||||||
handle_info({timeout, _, {request_timeout, PacketId}}, State = #state{inflight = Inflight}) ->
|
|
||||||
case maps:take(PacketId, Inflight) of
|
|
||||||
error ->
|
|
||||||
{noreply, State};
|
|
||||||
{{Ref, ReceiverPid}, NInflight} ->
|
|
||||||
ReceiverPid ! {response, Ref, {error, {-1, <<"request timeout">>}}},
|
|
||||||
{noreply, State#state{inflight = NInflight}}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% 收到efka推送的参数设置, 必须处理该消息;不设置超时
|
|
||||||
handle_info({tcp, Socket, <<?PACKET_PUSH, Packet/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
|
|
||||||
case jiffy:decode(Packet, [return_maps]) of
|
|
||||||
#{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := #{<<"config">> := ConfigJson}} ->
|
|
||||||
Ref = make_ref(),
|
|
||||||
ControllerPid ! {push_config, self(), Ref, ConfigJson},
|
|
||||||
Reply =
|
|
||||||
receive
|
|
||||||
{push_config_reply, Ref, ok} ->
|
|
||||||
#{<<"id">> => Id, <<"result">> => <<"ok">>};
|
|
||||||
{push_config_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
|
||||||
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
|
||||||
end,
|
|
||||||
JsonReply = jiffy:encode(Reply, [force_utf8]),
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, JsonReply/binary>>),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
#{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload}} ->
|
|
||||||
Ref = make_ref(),
|
|
||||||
ControllerPid ! {invoke, self(), Ref, Payload},
|
|
||||||
Reply =
|
|
||||||
receive
|
|
||||||
{invoke_reply, Ref, ok} ->
|
|
||||||
#{<<"id">> => Id, <<"result">> => <<"ok">>};
|
|
||||||
{invoke_reply, Ref, {ok, Result}} ->
|
|
||||||
#{<<"id">> => Id, <<"result">> => Result};
|
|
||||||
{invoke_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
|
||||||
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
|
||||||
end,
|
|
||||||
JsonReply = jiffy:encode(Reply, [force_utf8]),
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, JsonReply/binary>>),
|
|
||||||
{noreply, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% pub/sub的消息
|
|
||||||
handle_info({tcp, Socket, <<?PACKET_PUB, Packet/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
|
|
||||||
#{<<"topic">> := Topic, <<"content">> := Content} = jiffy:decode(Packet, [return_maps]),
|
|
||||||
ControllerPid ! {pub, Topic, Content},
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
%% 其他消息为非法消息
|
|
||||||
handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) ->
|
|
||||||
lager:debug("[efka_client] get unknown packet: ~p", [Packet]),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) ->
|
|
||||||
{stop, tcp_closed, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
||||||
%% with Reason. The return value is ignored.
|
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
|
||||||
State :: #state{}) -> term()).
|
|
||||||
terminate(_Reason, _State = #state{socket = Socket}) ->
|
|
||||||
gen_tcp:close(Socket),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% 采用32位编码
|
|
||||||
-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer().
|
|
||||||
next_packet_id(PacketId) when PacketId >= 4294967295 ->
|
|
||||||
1;
|
|
||||||
next_packet_id(PacketId) ->
|
|
||||||
PacketId + 1.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% simple callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
@ -1,108 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author anlicheng
|
|
||||||
%%% @copyright (C) 2025, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 20. 5月 2025 15:09
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(efka_client_broker).
|
|
||||||
-author("anlicheng").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/0]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
||||||
code_change/3]).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
|
|
||||||
}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% API
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_link() ->
|
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Initializes the server
|
|
||||||
-spec(init(Args :: term()) ->
|
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([]) ->
|
|
||||||
efka_client:start_link(<<"service1234">>, "localhost", 18088),
|
|
||||||
efka_client:controller_process(self()),
|
|
||||||
{ok, #state{}}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling call messages
|
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
|
||||||
{reply, ok, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast(_Request, State = #state{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling all non call/cast messages
|
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_info({push_config, ReceiverPid, Ref, Config}, State = #state{}) ->
|
|
||||||
lager:debug("[efka_client_broker] get push_config: ~p", [jiffy:decode(Config, [return_maps])]),
|
|
||||||
ReceiverPid ! {push_config_reply, Ref, ok},
|
|
||||||
{noreply, State};
|
|
||||||
handle_info({invoke, ReceiverPid, Ref, Payload}, State = #state{}) ->
|
|
||||||
lager:debug("[efka_client_broker] get invoke: ~p", [Payload]),
|
|
||||||
ReceiverPid ! {invoke_reply, Ref, {ok, <<"yes invoke me">>}},
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
||||||
%% with Reason. The return value is ignored.
|
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
|
||||||
State :: #state{}) -> term()).
|
|
||||||
terminate(_Reason, _State = #state{}) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
@ -1,49 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author aresei
|
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 30. 5月 2023 11:28
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(efka_point).
|
|
||||||
-author("aresei").
|
|
||||||
|
|
||||||
-record(point, {
|
|
||||||
measurement,
|
|
||||||
tags = [],
|
|
||||||
fields = [],
|
|
||||||
time = 0 :: integer()
|
|
||||||
}).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([new/4, normalized/1]).
|
|
||||||
|
|
||||||
-spec new(Measurement :: binary(), Tags :: map(), Fields :: map(), Timestamp :: integer()) -> #point{}.
|
|
||||||
new(Measurement, Tags, Fields, Timestamp) when is_binary(Measurement), is_map(Tags), is_map(Fields), is_integer(Timestamp) ->
|
|
||||||
#point{measurement = Measurement, tags = Tags, fields = Fields, time = Timestamp}.
|
|
||||||
|
|
||||||
-spec normalized(Point :: #point{}) -> binary().
|
|
||||||
normalized(#point{measurement = Measurement, tags = Tags, fields = Fields, time = Time}) ->
|
|
||||||
NTags = lists:map(fun({N, V}) -> <<N/binary, $=, V/binary>> end, maps:to_list(Tags)),
|
|
||||||
NFields = lists:map(fun({K, V}) -> <<K/binary, $=, (field_val(V))/binary>> end, maps:to_list(Fields)),
|
|
||||||
|
|
||||||
TagItems = lists:join(<<",">>, [Measurement | NTags]),
|
|
||||||
FieldItems = lists:join(<<",">>, NFields),
|
|
||||||
|
|
||||||
erlang:iolist_to_binary([TagItems, <<" ">>, FieldItems, <<" ">>, integer_to_binary(Time)]).
|
|
||||||
|
|
||||||
field_val(V) when is_integer(V) ->
|
|
||||||
integer_to_binary(V);
|
|
||||||
field_val(V) when is_float(V) ->
|
|
||||||
%% 默认按照浮点数表示
|
|
||||||
S = float_to_list(V, [{decimals, 6}, compact]),
|
|
||||||
list_to_binary(S);
|
|
||||||
field_val(V) when is_binary(V) ->
|
|
||||||
<<$", V/binary, $">>;
|
|
||||||
field_val(true) ->
|
|
||||||
<<"true">>;
|
|
||||||
field_val(false) ->
|
|
||||||
<<"false">>;
|
|
||||||
field_val(_Other) ->
|
|
||||||
erlang:error(unsupported_type).
|
|
||||||
@ -131,8 +131,7 @@ do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is
|
|||||||
tar_url = TarUrl,
|
tar_url = TarUrl,
|
||||||
%% 工作目录
|
%% 工作目录
|
||||||
root_dir = ServiceRootDir,
|
root_dir = ServiceRootDir,
|
||||||
params = <<"">>,
|
config_json = <<"">>,
|
||||||
metrics = <<"">>,
|
|
||||||
%% 状态: 0: 停止, 1: 运行中
|
%% 状态: 0: 停止, 1: 运行中
|
||||||
status = 0
|
status = 0
|
||||||
}),
|
}),
|
||||||
|
|||||||
@ -124,10 +124,11 @@ init([ServiceId]) ->
|
|||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
%% 绑定channel
|
%% 绑定channel
|
||||||
handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid}) ->
|
handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service_id = ServiceId}) ->
|
||||||
case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of
|
case is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid) of
|
||||||
false ->
|
false ->
|
||||||
erlang:monitor(process, ChannelPid),
|
erlang:monitor(process, ChannelPid),
|
||||||
|
lager:debug("[efka_service] service_id: ~p, channel attched", [ServiceId]),
|
||||||
{reply, ok, State#state{channel_pid = ChannelPid}};
|
{reply, ok, State#state{channel_pid = ChannelPid}};
|
||||||
true ->
|
true ->
|
||||||
{reply, {error, <<"channel exists">>}, State}
|
{reply, {error, <<"channel exists">>}, State}
|
||||||
@ -135,8 +136,12 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol
|
|||||||
|
|
||||||
%% 请求参数项 done
|
%% 请求参数项 done
|
||||||
handle_call(request_config, _From, State = #state{service_id = ServiceId}) ->
|
handle_call(request_config, _From, State = #state{service_id = ServiceId}) ->
|
||||||
Params = service_model:get_params(ServiceId),
|
case service_model:get_config_json(ServiceId) of
|
||||||
{reply, {ok, Params}, State};
|
{ok, ConfigJson} ->
|
||||||
|
{reply, {ok, ConfigJson}, State};
|
||||||
|
error ->
|
||||||
|
{reply, {ok, <<>>}, State}
|
||||||
|
end;
|
||||||
|
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
handle_call(_Request, _From, State = #state{}) ->
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
@ -148,6 +153,7 @@ handle_call(_Request, _From, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_id = ServiceId}) ->
|
handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_id = ServiceId}) ->
|
||||||
|
lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, metric data: ~p", [ServiceId, DeviceUUID, LineProtocolData]),
|
||||||
efka_agent:metric_data(ServiceId, DeviceUUID, LineProtocolData),
|
efka_agent:metric_data(ServiceId, DeviceUUID, LineProtocolData),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
|||||||
@ -82,6 +82,7 @@ init([]) ->
|
|||||||
modules => ['efka_service_sup']
|
modules => ['efka_service_sup']
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
|
||||||
{ok, {SupFlags, ChildSpecs}}.
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|
||||||
%% internal functions
|
%% internal functions
|
||||||
|
|||||||
@ -73,7 +73,7 @@ start_link(Socket) ->
|
|||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([Socket]) ->
|
init([Socket]) ->
|
||||||
ok = inet:setopts(Socket, [{active, true}]),
|
ok = inet:setopts(Socket, [{active, true}]),
|
||||||
lager:debug("[tcp_channel] get new socket: ~p", [Socket]),
|
lager:debug("[efka_tcp_channel] get micro service socket: ~p", [Socket]),
|
||||||
{ok, #state{socket = Socket}}.
|
{ok, #state{socket = Socket}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -122,7 +122,7 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
%% 处理client主动的请求
|
%% 处理micro-client:request => efka 主动的请求
|
||||||
handle_info({tcp, Socket, <<?PACKET_REQUEST:8, Data/binary>>}, State = #state{socket = Socket}) ->
|
handle_info({tcp, Socket, <<?PACKET_REQUEST:8, Data/binary>>}, State = #state{socket = Socket}) ->
|
||||||
Request = jiffy:decode(Data, [return_maps]),
|
Request = jiffy:decode(Data, [return_maps]),
|
||||||
case handle_request(Request, State) of
|
case handle_request(Request, State) of
|
||||||
@ -132,7 +132,7 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST:8, Data/binary>>}, State = #state{so
|
|||||||
{stop, Reason, NewState}
|
{stop, Reason, NewState}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% 处理client的响应
|
%% 处理micro-client:response => efka 的响应
|
||||||
handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, Data/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
|
handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, Data/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
|
||||||
Resp = jiffy:decode(Data, [return_maps]),
|
Resp = jiffy:decode(Data, [return_maps]),
|
||||||
case Resp of
|
case Resp of
|
||||||
@ -188,9 +188,12 @@ handle_info({topic_broadcast, Topic, Content}, State = #state{socket = Socket})
|
|||||||
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(Info, State = #state{}) ->
|
handle_info({tcp_error, Socket, Reason}, State = #state{socket = Socket, service_id = ServiceId}) ->
|
||||||
lager:debug("[tcp_channel] get info: ~p", [Info]),
|
lager:debug("[tcp_channel] tcp_error: ~p, assoc service: ~p", [Reason, ServiceId]),
|
||||||
{noreply, State}.
|
{stop, normal, State};
|
||||||
|
handle_info({tcp_closed, Socket}, State = #state{socket = Socket, service_id = ServiceId}) ->
|
||||||
|
lager:debug("[tcp_channel] tcp_closed: ~p, assoc service: ~p", [Socket, ServiceId]),
|
||||||
|
{stop, normal, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
%% @doc This function is called by a gen_server when it is about to
|
||||||
|
|||||||
@ -16,7 +16,7 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([create_table/0]).
|
-export([create_table/0]).
|
||||||
-export([insert/1, get_all_services/0, get_all_service_ids/0, get_running_services/0]).
|
-export([insert/1, get_all_services/0, get_all_service_ids/0, get_running_services/0]).
|
||||||
-export([get_metrics/1, get_params/1, set_metrics/2, set_params/2, get_service/1, get_status/1, change_status/2]).
|
-export([get_config_json/1, set_config/2, get_service/1, get_status/1, change_status/2]).
|
||||||
-export([display_services/0]).
|
-export([display_services/0]).
|
||||||
|
|
||||||
create_table() ->
|
create_table() ->
|
||||||
@ -52,14 +52,14 @@ change_status(ServiceId, NewStatus) when is_binary(ServiceId), is_integer(NewSta
|
|||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec set_params(ServiceId :: binary(), Params :: binary()) -> ok | {error, Reason :: any()}.
|
-spec set_config(ServiceId :: binary(), ConfigJson :: binary()) -> ok | {error, Reason :: any()}.
|
||||||
set_params(ServiceId, Params) when is_binary(ServiceId), is_binary(Params) ->
|
set_config(ServiceId, ConfigJson) when is_binary(ServiceId), is_binary(ConfigJson) ->
|
||||||
Fun = fun() ->
|
Fun = fun() ->
|
||||||
case mnesia:read(?TAB, ServiceId, write) of
|
case mnesia:read(?TAB, ServiceId, write) of
|
||||||
[] ->
|
[] ->
|
||||||
mnesia:abort(<<"service not found">>);
|
mnesia:abort(<<"service not found">>);
|
||||||
[S] ->
|
[S] ->
|
||||||
mnesia:write(?TAB, S#service{params = Params}, write)
|
mnesia:write(?TAB, S#service{config_json = ConfigJson}, write)
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
case mnesia:transaction(Fun) of
|
case mnesia:transaction(Fun) of
|
||||||
@ -69,39 +69,13 @@ set_params(ServiceId, Params) when is_binary(ServiceId), is_binary(Params) ->
|
|||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec set_metrics(ServiceId :: binary(), Metrics :: binary()) -> ok | {error, Reason :: any()}.
|
-spec get_config_json(ServiceId :: binary()) -> error | {ok, ConfigJson :: binary()}.
|
||||||
set_metrics(ServiceId, Metrics) when is_binary(ServiceId), is_binary(Metrics) ->
|
get_config_json(ServiceId) when is_binary(ServiceId) ->
|
||||||
Fun = fun() ->
|
|
||||||
case mnesia:read(?TAB, ServiceId, write) of
|
|
||||||
[] ->
|
|
||||||
mnesia:abort(<<"service not found">>);
|
|
||||||
[S] ->
|
|
||||||
mnesia:write(?TAB, S#service{metrics = Metrics}, write)
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
case mnesia:transaction(Fun) of
|
|
||||||
{'atomic', ok} ->
|
|
||||||
ok;
|
|
||||||
{'aborted', Reason} ->
|
|
||||||
{error, Reason}
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec get_params(ServiceId :: binary()) -> Params :: binary().
|
|
||||||
get_params(ServiceId) when is_binary(ServiceId) ->
|
|
||||||
case mnesia:dirty_read(?TAB, ServiceId) of
|
case mnesia:dirty_read(?TAB, ServiceId) of
|
||||||
[] ->
|
[] ->
|
||||||
<<"">>;
|
error;
|
||||||
[#service{params = Params}] ->
|
[#service{config_json = ConfigJson}] ->
|
||||||
Params
|
{ok, ConfigJson}
|
||||||
end.
|
|
||||||
|
|
||||||
-spec get_metrics(ServiceId :: binary()) -> Params :: binary().
|
|
||||||
get_metrics(ServiceId) when is_binary(ServiceId) ->
|
|
||||||
case mnesia:dirty_read(?TAB, ServiceId) of
|
|
||||||
[] ->
|
|
||||||
<<"">>;
|
|
||||||
[#service{metrics = Metrics}] ->
|
|
||||||
Metrics
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec get_status(ServiceId :: binary()) -> Status :: integer().
|
-spec get_status(ServiceId :: binary()) -> Status :: integer().
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user