统一概念
This commit is contained in:
parent
5dfba53805
commit
2e2865a62f
@ -162,7 +162,7 @@ handle_event(cast, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelA
|
|||||||
efka_transport:send(TransportPid, ?METHOD_PING, Ping),
|
efka_transport:send(TransportPid, ?METHOD_PING, Ping),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
handle_event(info, {timeout, _, create_transport}, ?STATE_ACTIVATED, State) ->
|
handle_event(info, {timeout, _, create_transport}, ?STATE_DENIED, State) ->
|
||||||
{ok, Props} = application:get_env(efka, tls_server),
|
{ok, Props} = application:get_env(efka, tls_server),
|
||||||
Host = proplists:get_value(host, Props),
|
Host = proplists:get_value(host, Props),
|
||||||
Port = proplists:get_value(port, Props),
|
Port = proplists:get_value(port, Props),
|
||||||
@ -223,7 +223,7 @@ handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pi
|
|||||||
%% 激活消息
|
%% 激活消息
|
||||||
|
|
||||||
%% 微服务部署
|
%% 微服务部署
|
||||||
handle_event(info, {server_push, PacketId, <<?PUSH_DEPLOY:8, DeployBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
handle_event(info, {server_async_call, PacketId, <<?PUSH_DEPLOY:8, DeployBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy),
|
#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy),
|
||||||
|
|
||||||
%% 短暂的等待,efka_inetd收到消息后就立即返回了
|
%% 短暂的等待,efka_inetd收到消息后就立即返回了
|
||||||
@ -238,7 +238,7 @@ handle_event(info, {server_push, PacketId, <<?PUSH_DEPLOY:8, DeployBin/binary>>}
|
|||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% 启动微服务
|
%% 启动微服务
|
||||||
handle_event(info, {server_push, PacketId, <<?PUSH_START_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
handle_event(info, {server_async_call, PacketId, <<?PUSH_START_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
%% 短暂的等待,efka_inetd收到消息后就立即返回了
|
%% 短暂的等待,efka_inetd收到消息后就立即返回了
|
||||||
Reply = case efka_inetd:start_service(ServiceId) of
|
Reply = case efka_inetd:start_service(ServiceId) of
|
||||||
ok ->
|
ok ->
|
||||||
@ -251,7 +251,7 @@ handle_event(info, {server_push, PacketId, <<?PUSH_START_SERVICE:8, ServiceId/bi
|
|||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% 停止微服务
|
%% 停止微服务
|
||||||
handle_event(info, {server_push, PacketId, <<?PUSH_STOP_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
handle_event(info, {server_async_call, PacketId, <<?PUSH_STOP_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
%% 短暂的等待,efka_inetd收到消息后就立即返回了
|
%% 短暂的等待,efka_inetd收到消息后就立即返回了
|
||||||
Reply = case efka_inetd:stop_service(ServiceId) of
|
Reply = case efka_inetd:stop_service(ServiceId) of
|
||||||
ok ->
|
ok ->
|
||||||
@ -264,7 +264,7 @@ handle_event(info, {server_push, PacketId, <<?PUSH_STOP_SERVICE:8, ServiceId/bin
|
|||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% config.json配置信息
|
%% config.json配置信息
|
||||||
handle_event(info, {server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) ->
|
handle_event(info, {server_async_call, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) ->
|
||||||
#push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config),
|
#push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config),
|
||||||
|
|
||||||
case efka_service:get_pid(ServiceId) of
|
case efka_service:get_pid(ServiceId) of
|
||||||
@ -283,7 +283,7 @@ handle_event(info, {server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/b
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% 收到需要回复的指令
|
%% 收到需要回复的指令
|
||||||
handle_event(info, {server_push, PacketId, <<?PUSH_INVOKE:8, InvokeBin/binary>>}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) ->
|
handle_event(info, {server_async_call, PacketId, <<?PUSH_INVOKE:8, InvokeBin/binary>>}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) ->
|
||||||
#invoke{service_id = ServiceId, payload = Payload, timeout = Timeout} = message_pb:decode_msg(InvokeBin, invoke),
|
#invoke{service_id = ServiceId, payload = Payload, timeout = Timeout} = message_pb:decode_msg(InvokeBin, invoke),
|
||||||
%% 消息发送到订阅系统
|
%% 消息发送到订阅系统
|
||||||
case efka_service:get_pid(ServiceId) of
|
case efka_service:get_pid(ServiceId) of
|
||||||
@ -302,7 +302,7 @@ handle_event(info, {server_push, PacketId, <<?PUSH_INVOKE:8, InvokeBin/binary>>}
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% 处理task_log
|
%% 处理task_log
|
||||||
handle_event(info, {server_push, PacketId, <<?PUSH_TASK_LOG:8, TaskLogBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
handle_event(info, {server_async_call, PacketId, <<?PUSH_TASK_LOG:8, TaskLogBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
#fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log),
|
#fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log),
|
||||||
lager:debug("[efka_agent] get task_log request: ~p", [TaskId]),
|
lager:debug("[efka_agent] get task_log request: ~p", [TaskId]),
|
||||||
{ok, Logs} = efka_inetd_task_log:get_logs(TaskId),
|
{ok, Logs} = efka_inetd_task_log:get_logs(TaskId),
|
||||||
|
|||||||
@ -83,7 +83,6 @@ start_link(ParentPid, Host, Port) when is_pid(ParentPid), is_list(Host), is_inte
|
|||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([ParentPid, Host, Port]) ->
|
init([ParentPid, Host, Port]) ->
|
||||||
ping_ticker(),
|
|
||||||
{ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = undefined}}.
|
{ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = undefined}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -115,6 +114,7 @@ handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = Paren
|
|||||||
{ok, Socket} ->
|
{ok, Socket} ->
|
||||||
ok = ssl:controlling_process(Socket, self()),
|
ok = ssl:controlling_process(Socket, self()),
|
||||||
ParentPid ! {connect_reply, ok},
|
ParentPid ! {connect_reply, ok},
|
||||||
|
ping_ticker(),
|
||||||
{noreply, State#state{socket = Socket}};
|
{noreply, State#state{socket = Socket}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
ParentPid ! {connect_reply, {error, Reason}},
|
ParentPid ! {connect_reply, {error, Reason}},
|
||||||
@ -169,7 +169,7 @@ handle_info({ssl, Socket, <<?PACKET_PUB, PubBin/binary>>}, State = #state{socket
|
|||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({ssl, Socket, <<?PACKET_ASYNC_CALL, PacketId:32, AsyncCallBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
|
handle_info({ssl, Socket, <<?PACKET_ASYNC_CALL, PacketId:32, AsyncCallBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
|
||||||
ParentPid ! {server_push, PacketId, AsyncCallBin},
|
ParentPid ! {server_async_call, PacketId, AsyncCallBin},
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% efka:request <-> iot:response
|
%% efka:request <-> iot:response
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user