diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index 27aa5ea..ba9f36b 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -75,11 +75,13 @@ request_config() -> -spec device_offline(DeviceUUID :: binary()) -> no_return(). device_offline(DeviceUUID) when is_binary(DeviceUUID) -> - send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}). + EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}, [force_utf8]), + send_event(1, EventBody). -spec device_online(DeviceUUID :: binary()) -> no_return(). device_online(DeviceUUID) when is_binary(DeviceUUID) -> - send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}). + EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}, [force_utf8]), + send_event(1, EventBody). -spec send_event(EventType :: integer(), Params :: binary()) -> no_return(). send_event(EventType, Params) when is_integer(EventType), is_binary(Params) -> diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 4a9d752..ce36daf 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -243,7 +243,7 @@ handle_info({server_push, PacketId, <>}, State {error, Reason} when is_binary(Reason) -> #push_reply{code = 0, message = Reason} end, - is_pid(TransportPid) andalso efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), + is_pid(TransportPid) andalso efka_transport:push_response(TransportPid, PacketId, message_pb:encode_msg(Reply)), {noreply, State}; @@ -254,7 +254,7 @@ handle_info({server_push, PacketId, <> case efka_service:get_pid(ServiceId) of undefined -> Reply = #push_reply{code = 0, message = <<"service not run">>}, - is_pid(TransportPid) andalso efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), + is_pid(TransportPid) andalso efka_transport:push_response(TransportPid, PacketId, message_pb:encode_msg(Reply)), {noreply, State}; ServicePid when is_pid(ServicePid) -> Ref = make_ref(), @@ -264,8 +264,7 @@ handle_info({server_push, PacketId, <> erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}), {noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} - end, - {noreply, State}; + end; %% 收到需要回复的指令 handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}}, @@ -276,7 +275,8 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, pay case efka_service:get_pid(ServiceId) of undefined -> Reply = #push_reply{code = 0, message = <<"micro_service not run">>, result = <<>>}, - safe_push_response(PacketId, message_pb:encode_msg(Reply), State); + safe_push_response(PacketId, message_pb:encode_msg(Reply), State), + {noreply, State}; ServicePid when is_pid(ServicePid) -> Ref = make_ref(), efka_service:invoke(ServicePid, Ref, Payload), @@ -284,9 +284,7 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, pay erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}), {noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} - end, - - {noreply, State}; + end; %% 处理命令 handle_info({server_command, ?COMMAND_AUTH, <>}, State = #state{transport_pid = TransportPid, status = Status}) -> diff --git a/apps/efka/src/efka_inetd.erl b/apps/efka/src/efka_inetd.erl index ea570a0..b5adad0 100644 --- a/apps/efka/src/efka_inetd.erl +++ b/apps/efka/src/efka_inetd.erl @@ -71,10 +71,23 @@ init([]) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). handle_call({deploy, TaskId, ServiceId, TarUrl}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> - {ok, TaskPid} = efka_inetd_task:start_link(TaskId, RootDir, ServiceId, TarUrl), - efka_inetd_task:deploy(TaskPid), + %% 创建目录 + {ok, ServiceRootDir} = ensure_dirs(RootDir, ServiceId), + case check_lock(ServiceRootDir, TarUrl) of + true -> + {reply, ok, State}; + false -> + case check_download_url(TarUrl) of + ok -> + {ok, TaskPid} = efka_inetd_task:start_link(TaskId, ServiceRootDir, ServiceId, TarUrl), + efka_inetd_task:deploy(TaskPid), - {reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}}; + {reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}}; + {error, Reason} -> + lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), + {reply, {error, <<"download url error">>}, State} + end + end; handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -136,3 +149,38 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +-spec ensure_dirs(RootDir :: string(), ServerId :: binary()) -> {ok, ServerRootDir :: string()}. +ensure_dirs(RootDir, ServerId) when is_list(RootDir), is_binary(ServerId) -> + %% 根目录 + ServiceRootDir = RootDir ++ "/" ++ binary_to_list(ServerId) ++ "/", + ok = filelib:ensure_dir(ServiceRootDir), + {ok, ServiceRootDir}. + +-spec check_lock(DirName :: string(), TarUrl :: binary()) -> boolean(). +check_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) -> + FileName = DirName ++ ".efka.lock", + case filelib:is_file(FileName) of + true -> + {ok, Content} = file:read_file(FileName), + Content =:= TarUrl; + false -> + false + end. + +%% 通过head请求先判定下载地址是否正确 +-spec check_download_url(Url :: string() | binary()) -> ok | {error, Reason :: term()}. +check_download_url(Url) when is_binary(Url) -> + check_download_url(binary_to_list(Url)); +check_download_url(Url) when is_list(Url) -> + SslOpts = [ + {ssl, [ + % 完全禁用证书验证 + {verify, verify_none} + ]} + ], + case httpc:request(head, {Url, []}, SslOpts, [{sync, true}]) of + {ok, {{_, 200, "OK"}, _Headers, _}} -> + ok; + {error, Reason} -> + {error, Reason} + end. \ No newline at end of file diff --git a/apps/efka/src/efka_inetd_task.erl b/apps/efka/src/efka_inetd_task.erl index 0534894..37ca96d 100644 --- a/apps/efka/src/efka_inetd_task.erl +++ b/apps/efka/src/efka_inetd_task.erl @@ -22,7 +22,7 @@ -define(SERVER, ?MODULE). -record(state, { - root_dir :: string(), + service_root_dir :: string(), task_id :: integer(), service_id :: binary(), tar_url :: binary() @@ -37,10 +37,10 @@ deploy(Pid) when is_pid(Pid) -> gen_server:cast(Pid, deploy). %% @doc Spawns the server and registers the local name (unique) --spec(start_link(TaskId :: integer(), RootDir :: string(), ServiceId :: binary(), TarUrl :: binary()) -> +-spec(start_link(TaskId :: integer(), ServiceRootDir :: string(), ServiceId :: binary(), TarUrl :: binary()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(TaskId, RootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(RootDir), is_binary(ServiceId), is_binary(TarUrl) -> - gen_server:start_link(?MODULE, [TaskId, RootDir, ServiceId, TarUrl], []). +start_link(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(ServiceRootDir), is_binary(ServiceId), is_binary(TarUrl) -> + gen_server:start_link(?MODULE, [TaskId, ServiceRootDir, ServiceId, TarUrl], []). %%%=================================================================== %%% gen_server callbacks @@ -51,8 +51,8 @@ start_link(TaskId, RootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list( -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([TaskId, RootDir, ServiceId, TarUrl]) -> - {ok, #state{task_id = TaskId, root_dir = RootDir, service_id = ServiceId, tar_url = TarUrl}}. +init([TaskId, ServiceRootDir, ServiceId, TarUrl]) -> + {ok, #state{task_id = TaskId, service_root_dir = ServiceRootDir, service_id = ServiceId, tar_url = TarUrl}}. %% @private %% @doc Handling call messages @@ -73,53 +73,38 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(deploy, State = #state{task_id = TaskId, root_dir = RootDir, service_id = ServiceId, tar_url = TarUrl}) -> - %% 创建目录 - {ok, ServiceRootDir} = ensure_dirs(RootDir, ServiceId), - case check_lock(ServiceRootDir, TarUrl) of - true -> - {stop, normal, State}; - false -> - case check_download_url(TarUrl) of +handle_cast(deploy, State = #state{task_id = TaskId, service_root_dir = ServiceRootDir, service_id = ServiceId, tar_url = TarUrl}) -> + case download(binary_to_list(TarUrl), ServiceRootDir) of + {ok, TarFile} -> + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download completed">>), + {ok, WorkDir} = make_work_dir(ServiceRootDir), + %% 清理目录下的文件 + Result = delete_directory(WorkDir), + lager:debug("delete_directory result is: ~p", [Result]), + case tar_extract(TarFile, WorkDir) of ok -> - case download(binary_to_list(TarUrl), ServiceRootDir) of - {ok, TarFile} -> - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download completed">>), - {ok, WorkDir} = make_work_dir(ServiceRootDir), - %% 清理目录下的文件 - Result = delete_directory(WorkDir), - lager:debug("delete_directory result is: ~p", [Result]), - - case tar_extract(TarFile, WorkDir) of - ok -> - %% 创建lock文件 - touch_lock(ServiceRootDir, TarUrl), - %% 更新数据 - ok = service_model:insert(#service{ - service_id = ServiceId, - tar_url = TarUrl, - %% 工作目录 - root_dir = ServiceRootDir, - params = <<"">>, - metrics = <<"">>, - %% 状态: 0: 停止, 1: 运行中 - status = 0 - }), - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"deploy success">>), - {stop, normal, State}; - {error, Reason} -> - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"tar decompression error">>), - {stop, {error, Reason}, State} - end; - {error, Reason} -> - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download error">>), - {stop, {error, Reason}, State} - end; + %% 创建lock文件 + touch_lock(ServiceRootDir, TarUrl), + %% 更新数据 + ok = service_model:insert(#service{ + service_id = ServiceId, + tar_url = TarUrl, + %% 工作目录 + root_dir = ServiceRootDir, + params = <<"">>, + metrics = <<"">>, + %% 状态: 0: 停止, 1: 运行中 + status = 0 + }), + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"deploy success">>), + {stop, normal, State}; {error, Reason} -> - lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download url error">>), - {stop, {error, <<"download url error">>}, State} - end + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"tar decompression error">>), + {stop, {error, Reason}, State} + end; + {error, Reason} -> + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download error">>), + {stop, {error, Reason}, State} end. %% @private @@ -153,13 +138,6 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== --spec ensure_dirs(RootDir :: string(), ServerId :: binary()) -> {ok, ServerRootDir :: string()}. -ensure_dirs(RootDir, ServerId) when is_list(RootDir), is_binary(ServerId) -> - %% 根目录 - ServiceRootDir = RootDir ++ "/" ++ binary_to_list(ServerId) ++ "/", - ok = filelib:ensure_dir(ServiceRootDir), - {ok, ServiceRootDir}. - %% 工作逻辑,压缩文件需要解压到工作目录 -spec make_work_dir(ServiceRootDir :: string()) -> {ok, WorkDir :: string()}. make_work_dir(ServiceRootDir) when is_list(ServiceRootDir) -> @@ -191,17 +169,6 @@ delete_directory(Dir) when is_list(Dir) -> {error, Reason} end. --spec check_lock(DirName :: string(), TarUrl :: binary()) -> boolean(). -check_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) -> - FileName = DirName ++ ".efka.lock", - case filelib:is_file(FileName) of - true -> - {ok, Content} = file:read_file(FileName), - Content =:= TarUrl; - false -> - false - end. - -spec touch_lock(DirName :: string(), TarUrl :: binary()) -> boolean(). touch_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) -> FileName = DirName ++ ".efka.lock", @@ -213,23 +180,6 @@ touch_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) -> false end. -%% 通过head请求先判定下载地址是否正确 -check_download_url(Url) when is_binary(Url) -> - check_download_url(binary_to_list(Url)); -check_download_url(Url) when is_list(Url) -> - SslOpts = [ - {ssl, [ - % 完全禁用证书验证 - {verify, verify_none} - ]} - ], - case httpc:request(head, {Url, []}, SslOpts, [{sync, true}]) of - {ok, {{_, 200, "OK"}, _Headers, _}} -> - ok; - {error, Reason} -> - {error, Reason} - end. - %% 解压文件到指定目录 -spec tar_extract(TarFile :: string(), TargetDir :: string()) -> ok | {error, Reason :: term()}. tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) -> @@ -243,7 +193,7 @@ tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) -> end. %% 下载文件 --spec download(Url :: string(), TargetDir :: string()) -> {ok, TarFile :: string()} | {error, Reason :: any()}. +-spec download(Url :: string(), TargetDir :: string()) -> {ok, TarFile :: string()} | {error, Reason :: term()}. download(Url, TargetDir) when is_list(Url), is_list(TargetDir) -> SslOpts = [ {ssl, [ diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index e69c7bb..2490b3c 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -20,7 +20,7 @@ %% API -export([start_link/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). --export([push_config/3, request_config/1]). +-export([push_config/3, request_config/1, invoke/3]). -export([metric_data/3, send_event/3]). %% gen_server callbacks @@ -76,10 +76,6 @@ invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload request_config(Pid) when is_pid(Pid) -> gen_server:call(Pid, request_config). -%% todo -async_request(Pid, Method, Params) when is_binary(Method), is_binary(Params) -> - ok. - metric_data(Pid, DeviceUUID, Data) when is_pid(Pid), is_binary(DeviceUUID), is_binary(Data) -> gen_server:cast(Pid, {metric_data, DeviceUUID, Data}).