Compare commits
No commits in common. "5787f797c3c04aef037eb1df43a74c425c7891ff" and "e7d8f3a5eb0dafa0f8e17bf63bb697a36de75ffa" have entirely different histories.
5787f797c3
...
e7d8f3a5eb
@ -50,4 +50,4 @@
|
|||||||
|
|
||||||
-define(PUSH_SERVICE_CONFIG, 16#04).
|
-define(PUSH_SERVICE_CONFIG, 16#04).
|
||||||
-define(PUSH_INVOKE, 16#05).
|
-define(PUSH_INVOKE, 16#05).
|
||||||
-define(PUSH_TASK_LOG, 16#06).
|
-define(PUSH_TASK_LOG, 16#05).
|
||||||
|
|||||||
@ -210,7 +210,7 @@ handle_cast(_Info, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
|
||||||
%% 收到请求的响应, client主动向efka发送的异步请求的响应
|
%% 收到请求的响应
|
||||||
handle_info({tcp, Socket, <<?PACKET_RESPONSE, Packet/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
|
handle_info({tcp, Socket, <<?PACKET_RESPONSE, Packet/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
|
||||||
case jiffy:decode(Packet, [return_maps]) of
|
case jiffy:decode(Packet, [return_maps]) of
|
||||||
#{<<"id">> := Id, <<"result">> := Result} ->
|
#{<<"id">> := Id, <<"result">> := Result} ->
|
||||||
@ -254,8 +254,8 @@ handle_info({tcp, Socket, <<?PACKET_PUSH, Packet/binary>>}, State = #state{socke
|
|||||||
{push_config_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
{push_config_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
||||||
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
||||||
end,
|
end,
|
||||||
JsonReply = jiffy:encode(Reply, [force_utf8]),
|
Packet = jiffy:encode(Reply, [force_utf8]),
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, JsonReply/binary>>),
|
ok = gen_tcp:send(Socket, Packet),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
#{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload}} ->
|
#{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload}} ->
|
||||||
@ -263,15 +263,13 @@ handle_info({tcp, Socket, <<?PACKET_PUSH, Packet/binary>>}, State = #state{socke
|
|||||||
ControllerPid ! {invoke, self(), Ref, Payload},
|
ControllerPid ! {invoke, self(), Ref, Payload},
|
||||||
Reply =
|
Reply =
|
||||||
receive
|
receive
|
||||||
{invoke_reply, Ref, ok} ->
|
|
||||||
#{<<"id">> => Id, <<"result">> => <<"ok">>};
|
|
||||||
{invoke_reply, Ref, {ok, Result}} ->
|
{invoke_reply, Ref, {ok, Result}} ->
|
||||||
#{<<"id">> => Id, <<"result">> => Result};
|
#{<<"id">> => Id, <<"result">> => Result};
|
||||||
{invoke_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
{invoke_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
||||||
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
||||||
end,
|
end,
|
||||||
JsonReply = jiffy:encode(Reply, [force_utf8]),
|
Packet = jiffy:encode(Reply, [force_utf8]),
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, JsonReply/binary>>),
|
ok = gen_tcp:send(Socket, Packet),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
|||||||
@ -82,7 +82,7 @@ handle_info({push_config, ReceiverPid, Ref, Config}, State = #state{}) ->
|
|||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_info({invoke, ReceiverPid, Ref, Payload}, State = #state{}) ->
|
handle_info({invoke, ReceiverPid, Ref, Payload}, State = #state{}) ->
|
||||||
lager:debug("[efka_client_broker] get invoke: ~p", [Payload]),
|
lager:debug("[efka_client_broker] get invoke: ~p", [Payload]),
|
||||||
ReceiverPid ! {invoke_reply, Ref, {ok, <<"yes invoke me">>}},
|
ReceiverPid ! {invoke_reply, Ref, ok},
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
|||||||
@ -299,7 +299,6 @@ handle_info({server_push, PacketId, <<?PUSH_INVOKE:8, InvokeBin/binary>>}, State
|
|||||||
%% 处理task_log
|
%% 处理task_log
|
||||||
handle_info({server_push, PacketId, <<?PUSH_TASK_LOG:8, TaskLogBin/binary>>}, State = #state{status = ?STATE_ACTIVATED}) ->
|
handle_info({server_push, PacketId, <<?PUSH_TASK_LOG:8, TaskLogBin/binary>>}, State = #state{status = ?STATE_ACTIVATED}) ->
|
||||||
#fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log),
|
#fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log),
|
||||||
lager:debug("[efka_agent] get task_log request: ~p", [TaskId]),
|
|
||||||
{ok, Logs} = efka_inetd_task_log:get_logs(TaskId),
|
{ok, Logs} = efka_inetd_task_log:get_logs(TaskId),
|
||||||
Reply = case length(Logs) > 0 of
|
Reply = case length(Logs) > 0 of
|
||||||
true ->
|
true ->
|
||||||
@ -341,8 +340,8 @@ handle_info({service_reply, Ref, EmsReply}, State = #state{inflight = Inflight})
|
|||||||
{noreply, State};
|
{noreply, State};
|
||||||
{PacketId, NInflight} ->
|
{PacketId, NInflight} ->
|
||||||
Reply = case EmsReply of
|
Reply = case EmsReply of
|
||||||
{ok, Result} ->
|
ok ->
|
||||||
#async_call_reply{code = 1, result = Result};
|
#async_call_reply{code = 1, message = <<"">>, result = <<>>};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
#async_call_reply{code = 0, message = Reason}
|
#async_call_reply{code = 0, message = Reason}
|
||||||
end,
|
end,
|
||||||
|
|||||||
@ -114,7 +114,7 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(ServiceRootDir), is_binary(ServiceId), is_binary(TarUrl) ->
|
do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(ServiceRootDir), is_binary(ServiceId), is_binary(TarUrl) ->
|
||||||
case download(binary_to_list(TarUrl), ServiceRootDir) of
|
case download(binary_to_list(TarUrl), ServiceRootDir) of
|
||||||
{ok, TarFile, CostTs} ->
|
{ok, TarFile, CostTs} ->
|
||||||
Log = io_lib:format("download: ~p completed, cost time: ~p(ms)", [binary_to_list(TarUrl), CostTs]),
|
Log = io_lib:format("[efka_inetd_task] download: ~p completed, cost time: ~p(ms)", [binary_to_list(TarUrl), CostTs]),
|
||||||
efka_inetd_task_log:stash(TaskId, list_to_binary(Log)),
|
efka_inetd_task_log:stash(TaskId, list_to_binary(Log)),
|
||||||
|
|
||||||
%% 创建工作目录
|
%% 创建工作目录
|
||||||
|
|||||||
@ -35,7 +35,7 @@ stash(TaskId, Log) when is_integer(TaskId), is_binary(Log) ->
|
|||||||
stash(TaskId, Items) when is_integer(TaskId), is_list(Items) ->
|
stash(TaskId, Items) when is_integer(TaskId), is_list(Items) ->
|
||||||
{{Y, M, D}, {H, I, S}} = calendar:local_time(),
|
{{Y, M, D}, {H, I, S}} = calendar:local_time(),
|
||||||
TimePrefix = iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])),
|
TimePrefix = iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])),
|
||||||
Log = iolist_to_binary([TimePrefix, <<" ">>, lists:join(<<" ">>, Items)]),
|
Log = iolist_to_binary([TimePrefix, <<" ">>, lists:join(<<" ">>, Items), <<$\n>>]),
|
||||||
gen_server:cast(?SERVER, {stash, TaskId, Log}).
|
gen_server:cast(?SERVER, {stash, TaskId, Log}).
|
||||||
|
|
||||||
-spec flush(TaskId :: integer()) -> no_return().
|
-spec flush(TaskId :: integer()) -> no_return().
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user