diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 5752770..b9c95a4 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -263,7 +263,7 @@ handle_info({server_push_message, PacketId, <>}, State = #state{transport_pid = TransportPid, inflight = Inflight}) -> #service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ParamsBin, service_config), - case efka_micro_service:get_pid(ServiceId) of + case efka_service:get_pid(ServiceId) of undefined -> Reply = #efka_response { code = 0, @@ -273,7 +273,7 @@ handle_info({server_push_message, PacketId, < Ref = make_ref(), - efka_micro_service:push_config(ServicePid, Ref, ConfigJson), + efka_service:push_config(ServicePid, Ref, ConfigJson), %% 处理超时逻辑 erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}), @@ -282,7 +282,7 @@ handle_info({server_push_message, PacketId, < case maps:take(Ref, Inflight) of error -> diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index 1172567..2fb79a0 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -112,14 +112,14 @@ start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) -> init([ServiceId]) -> case service_model:get_service(ServiceId) of error -> - lager:notice("[efka_micro_service] service_id: ~p, not found", [ServiceId]), + lager:notice("[efka_service] service_id: ~p, not found", [ServiceId]), ignore; {ok, Service = #micro_service{root_dir = RootDir}} -> case efka_manifest:new(RootDir) of {ok, Manifest} -> init0(Service, Manifest); {error, Reason} -> - lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), + lager:notice("[efka_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), ignore end end. @@ -129,14 +129,14 @@ init0(#micro_service{service_id = ServiceId, status = 1}, Manifest) -> case efka_manifest:startup(Manifest) of {ok, Port} -> {os_pid, OSPid} = erlang:port_info(Port, os_pid), - lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]), + lager:debug("[efka_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]), {ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; {error, Reason} -> - lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), + lager:debug("[efka_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), {ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} end; init0(#micro_service{service_id = ServiceId, status = 0}, Manifest) -> - lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"), + lager:debug("[efka_service] service: ~p current status is 0, not boot"), {ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}. %% @private @@ -244,16 +244,16 @@ handle_cast(_Request, State = #state{}) -> handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, manifest = Manifest}) -> case service_model:get_status(ServiceId) of 0 -> - lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]), + lager:debug("[efka_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]), {noreply, State}; 1 -> case efka_manifest:startup(Manifest) of {ok, Port} -> {os_pid, OSPid} = erlang:port_info(Port, os_pid), - lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), + lager:debug("[efka_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), {noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; {error, Reason} -> - lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), + lager:debug("[efka_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), {noreply, State#state{running_status = ?STATUS_STOPPED}} end end; @@ -269,18 +269,18 @@ handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight}) -> end; handle_info({Port, {data, Data}}, State = #state{port = Port, service_id = ServiceId}) -> - lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]), + lager:debug("[efka_service] service_id: ~p, port data: ~p", [ServiceId, Data]), {noreply, State}; %% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的 handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) -> - lager:debug("[efka_micro_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]), + lager:debug("[efka_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]), % erlang:start_timer(5000, self(), reboot_service), {noreply, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}}; %% 处理channel进程的退出 handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) -> - lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]), + lager:debug("[efka_service] service: ~p, channel exited: ~p", [ServiceId, Reason]), {noreply, State#state{channel_pid = undefined, inflight = #{}}}. %% @private @@ -291,11 +291,11 @@ handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_ -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). terminate(Reason, _State = #state{os_pid = OSPid}) -> - lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]), + lager:debug("[efka_service] terminate with reason: ~p", [Reason]), kill_os_pid(OSPid), ok; terminate(Reason, _State) -> - lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]), + lager:debug("[efka_service] terminate with reason: ~p", [Reason]), ok. %% @private diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index e9c6a70..79feeb9 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -56,12 +56,12 @@ init([]) -> }, #{ - id => 'efka_server_sup', - start => {'efka_micro_service_sup', start_link, []}, + id => 'efka_service_sup', + start => {'efka_service_sup', start_link, []}, restart => permanent, shutdown => 2000, type => supervisor, - modules => ['efka_micro_service_sup'] + modules => ['efka_service_sup'] } ], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index e6e0a97..1ac2cd6 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -113,11 +113,11 @@ handle_cast(_Request, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). %% 注册 handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> - case efka_micro_service:get_pid(ServiceId) of + case efka_service:get_pid(ServiceId) of undefined -> ok = gen_tcp:send(Socket, <>); Pid when is_pid(Pid) -> - case efka_micro_service:attach_channel(Pid, self()) of + case efka_service:attach_channel(Pid, self()) of ok -> ok = gen_tcp:send(Socket, <>), {noreply, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}}; @@ -129,23 +129,23 @@ handle_info({tcp, Socket, <>} %% 请求参数 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - {ok, ConfigJson} = efka_micro_service:request_config(ServicePid), + {ok, ConfigJson} = efka_service:request_config(ServicePid), ok = gen_tcp:send(Socket, <>), {noreply, State}; %% 数据项 handle_info({tcp, Socket, <<0:32, ?PACKET_METRIC_DATA:8, Len:8, DeviceUUID:Len/binary, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - efka_micro_service:metric_data(ServicePid, DeviceUUID, Data), + efka_service:metric_data(ServicePid, DeviceUUID, Data), {noreply, State}; %% Event事件 handle_info({tcp, Socket, <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - efka_micro_service:send_event(ServicePid, EventType, Params), + efka_service:send_event(ServicePid, EventType, Params), {noreply, State}; %% AIEvent事件 handle_info({tcp, Socket, <<0:32, ?PACKET_AI_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - efka_micro_service:send_ai_event(ServicePid, EventType, Params), + efka_service:send_ai_event(ServicePid, EventType, Params), {noreply, State}; %% 收到端上的响应