fix
This commit is contained in:
parent
c9a04da151
commit
1d35970750
@ -263,7 +263,7 @@ handle_info({server_push_message, PacketId, <<?METHOD_DEPLOY:8, DeployBin/binary
|
|||||||
handle_info({server_push_message, PacketId, <<?METHOD_CONFIG:8, ParamsBin/binary>>}, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
|
handle_info({server_push_message, PacketId, <<?METHOD_CONFIG:8, ParamsBin/binary>>}, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
|
||||||
#service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ParamsBin, service_config),
|
#service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ParamsBin, service_config),
|
||||||
|
|
||||||
case efka_micro_service:get_pid(ServiceId) of
|
case efka_service:get_pid(ServiceId) of
|
||||||
undefined ->
|
undefined ->
|
||||||
Reply = #efka_response {
|
Reply = #efka_response {
|
||||||
code = 0,
|
code = 0,
|
||||||
@ -273,7 +273,7 @@ handle_info({server_push_message, PacketId, <<?METHOD_CONFIG:8, ParamsBin/binary
|
|||||||
{noreply, State};
|
{noreply, State};
|
||||||
ServicePid when is_pid(ServicePid) ->
|
ServicePid when is_pid(ServicePid) ->
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
efka_micro_service:push_config(ServicePid, Ref, ConfigJson),
|
efka_service:push_config(ServicePid, Ref, ConfigJson),
|
||||||
|
|
||||||
%% 处理超时逻辑
|
%% 处理超时逻辑
|
||||||
erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}),
|
erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}),
|
||||||
@ -282,7 +282,7 @@ handle_info({server_push_message, PacketId, <<?METHOD_CONFIG:8, ParamsBin/binary
|
|||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% 收到来自efka_micro_service的回复
|
%% 收到来自efka_service的回复
|
||||||
handle_info({ems_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) ->
|
handle_info({ems_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) ->
|
||||||
case maps:take(Ref, Inflight) of
|
case maps:take(Ref, Inflight) of
|
||||||
error ->
|
error ->
|
||||||
|
|||||||
@ -112,14 +112,14 @@ start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) ->
|
|||||||
init([ServiceId]) ->
|
init([ServiceId]) ->
|
||||||
case service_model:get_service(ServiceId) of
|
case service_model:get_service(ServiceId) of
|
||||||
error ->
|
error ->
|
||||||
lager:notice("[efka_micro_service] service_id: ~p, not found", [ServiceId]),
|
lager:notice("[efka_service] service_id: ~p, not found", [ServiceId]),
|
||||||
ignore;
|
ignore;
|
||||||
{ok, Service = #micro_service{root_dir = RootDir}} ->
|
{ok, Service = #micro_service{root_dir = RootDir}} ->
|
||||||
case efka_manifest:new(RootDir) of
|
case efka_manifest:new(RootDir) of
|
||||||
{ok, Manifest} ->
|
{ok, Manifest} ->
|
||||||
init0(Service, Manifest);
|
init0(Service, Manifest);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]),
|
lager:notice("[efka_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]),
|
||||||
ignore
|
ignore
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
@ -129,14 +129,14 @@ init0(#micro_service{service_id = ServiceId, status = 1}, Manifest) ->
|
|||||||
case efka_manifest:startup(Manifest) of
|
case efka_manifest:startup(Manifest) of
|
||||||
{ok, Port} ->
|
{ok, Port} ->
|
||||||
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
|
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
|
||||||
lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]),
|
lager:debug("[efka_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]),
|
||||||
{ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
|
{ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
|
lager:debug("[efka_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
|
||||||
{ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}
|
{ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}
|
||||||
end;
|
end;
|
||||||
init0(#micro_service{service_id = ServiceId, status = 0}, Manifest) ->
|
init0(#micro_service{service_id = ServiceId, status = 0}, Manifest) ->
|
||||||
lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"),
|
lager:debug("[efka_service] service: ~p current status is 0, not boot"),
|
||||||
{ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}.
|
{ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -244,16 +244,16 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, manifest = Manifest}) ->
|
handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, manifest = Manifest}) ->
|
||||||
case service_model:get_status(ServiceId) of
|
case service_model:get_status(ServiceId) of
|
||||||
0 ->
|
0 ->
|
||||||
lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]),
|
lager:debug("[efka_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
1 ->
|
1 ->
|
||||||
case efka_manifest:startup(Manifest) of
|
case efka_manifest:startup(Manifest) of
|
||||||
{ok, Port} ->
|
{ok, Port} ->
|
||||||
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
|
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
|
||||||
lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]),
|
lager:debug("[efka_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]),
|
||||||
{noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
|
{noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
|
lager:debug("[efka_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
|
||||||
{noreply, State#state{running_status = ?STATUS_STOPPED}}
|
{noreply, State#state{running_status = ?STATUS_STOPPED}}
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
@ -269,18 +269,18 @@ handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight}) ->
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({Port, {data, Data}}, State = #state{port = Port, service_id = ServiceId}) ->
|
handle_info({Port, {data, Data}}, State = #state{port = Port, service_id = ServiceId}) ->
|
||||||
lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]),
|
lager:debug("[efka_service] service_id: ~p, port data: ~p", [ServiceId, Data]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的
|
%% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的
|
||||||
handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) ->
|
handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) ->
|
||||||
lager:debug("[efka_micro_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]),
|
lager:debug("[efka_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]),
|
||||||
% erlang:start_timer(5000, self(), reboot_service),
|
% erlang:start_timer(5000, self(), reboot_service),
|
||||||
{noreply, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}};
|
{noreply, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}};
|
||||||
|
|
||||||
%% 处理channel进程的退出
|
%% 处理channel进程的退出
|
||||||
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) ->
|
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) ->
|
||||||
lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]),
|
lager:debug("[efka_service] service: ~p, channel exited: ~p", [ServiceId, Reason]),
|
||||||
{noreply, State#state{channel_pid = undefined, inflight = #{}}}.
|
{noreply, State#state{channel_pid = undefined, inflight = #{}}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -291,11 +291,11 @@ handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_
|
|||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
State :: #state{}) -> term()).
|
State :: #state{}) -> term()).
|
||||||
terminate(Reason, _State = #state{os_pid = OSPid}) ->
|
terminate(Reason, _State = #state{os_pid = OSPid}) ->
|
||||||
lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]),
|
lager:debug("[efka_service] terminate with reason: ~p", [Reason]),
|
||||||
kill_os_pid(OSPid),
|
kill_os_pid(OSPid),
|
||||||
ok;
|
ok;
|
||||||
terminate(Reason, _State) ->
|
terminate(Reason, _State) ->
|
||||||
lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]),
|
lager:debug("[efka_service] terminate with reason: ~p", [Reason]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
|||||||
@ -56,12 +56,12 @@ init([]) ->
|
|||||||
},
|
},
|
||||||
|
|
||||||
#{
|
#{
|
||||||
id => 'efka_server_sup',
|
id => 'efka_service_sup',
|
||||||
start => {'efka_micro_service_sup', start_link, []},
|
start => {'efka_service_sup', start_link, []},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
shutdown => 2000,
|
shutdown => 2000,
|
||||||
type => supervisor,
|
type => supervisor,
|
||||||
modules => ['efka_micro_service_sup']
|
modules => ['efka_service_sup']
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
{ok, {SupFlags, ChildSpecs}}.
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|||||||
@ -113,11 +113,11 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
%% 注册
|
%% 注册
|
||||||
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REGISTER:8, ServiceId/binary>>}, State = #state{socket = Socket}) ->
|
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REGISTER:8, ServiceId/binary>>}, State = #state{socket = Socket}) ->
|
||||||
case efka_micro_service:get_pid(ServiceId) of
|
case efka_service:get_pid(ServiceId) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, 0:8, "service not running">>);
|
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, 0:8, "service not running">>);
|
||||||
Pid when is_pid(Pid) ->
|
Pid when is_pid(Pid) ->
|
||||||
case efka_micro_service:attach_channel(Pid, self()) of
|
case efka_service:attach_channel(Pid, self()) of
|
||||||
ok ->
|
ok ->
|
||||||
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, 1:8>>),
|
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, 1:8>>),
|
||||||
{noreply, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}};
|
{noreply, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}};
|
||||||
@ -129,23 +129,23 @@ handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REGISTER:8, ServiceId/binary>>}
|
|||||||
|
|
||||||
%% 请求参数
|
%% 请求参数
|
||||||
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REQUEST_CONFIG:8>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
|
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REQUEST_CONFIG:8>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
|
||||||
{ok, ConfigJson} = efka_micro_service:request_config(ServicePid),
|
{ok, ConfigJson} = efka_service:request_config(ServicePid),
|
||||||
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, ConfigJson/binary>>),
|
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, ConfigJson/binary>>),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% 数据项
|
%% 数据项
|
||||||
handle_info({tcp, Socket, <<0:32, ?PACKET_METRIC_DATA:8, Len:8, DeviceUUID:Len/binary, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
|
handle_info({tcp, Socket, <<0:32, ?PACKET_METRIC_DATA:8, Len:8, DeviceUUID:Len/binary, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
|
||||||
efka_micro_service:metric_data(ServicePid, DeviceUUID, Data),
|
efka_service:metric_data(ServicePid, DeviceUUID, Data),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% Event事件
|
%% Event事件
|
||||||
handle_info({tcp, Socket, <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
|
handle_info({tcp, Socket, <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
|
||||||
efka_micro_service:send_event(ServicePid, EventType, Params),
|
efka_service:send_event(ServicePid, EventType, Params),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% AIEvent事件
|
%% AIEvent事件
|
||||||
handle_info({tcp, Socket, <<0:32, ?PACKET_AI_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
|
handle_info({tcp, Socket, <<0:32, ?PACKET_AI_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
|
||||||
efka_micro_service:send_ai_event(ServicePid, EventType, Params),
|
efka_service:send_ai_event(ServicePid, EventType, Params),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% 收到端上的响应
|
%% 收到端上的响应
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user