diff --git a/apps/efka/src/efka_agent2.erl b/apps/efka/src/efka_agent2.erl index 7363564..902015f 100644 --- a/apps/efka/src/efka_agent2.erl +++ b/apps/efka/src/efka_agent2.erl @@ -34,8 +34,8 @@ -record(state, { transport_pid :: undefined | pid(), - %% 映射关系 #{Ref => PacketId} - inflight = #{} + %% 服务器端推送的消息的未确认列表, 映射关系 #{Ref => PacketId} + push_inflight = #{} }). %%%=================================================================== @@ -98,6 +98,13 @@ callback_mode() -> %% @doc If callback_mode is handle_event_function, then whenever a %% gen_statem receives an event from call/2, cast/2, or as a normal %% process message, this function is called. +handle_event({call, From}, {request_service_config, ReceiverPid, ServiceId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + Ref = efka_transport:request(TransportPid, ReceiverPid, ?METHOD_REQUEST_SERVICE_CONFIG, ServiceId), + {keep_state, State, [{reply, From, {ok, Ref}}]}; + +handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _, State) -> + {keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]}; + handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> Packet = message_pb:encode_msg(#data{ service_id = ServiceId, @@ -255,7 +262,7 @@ handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, inflight = Inflight}) -> +handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) -> #push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config), case efka_service:get_pid(ServiceId) of @@ -270,11 +277,11 @@ handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{inflight = Inflight, transport_pid = TransportPid}) -> +handle_event(info, {server_push, PacketId, <>}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) -> #invoke{service_id = ServiceId, payload = Payload, timeout = Timeout} = message_pb:decode_msg(InvokeBin, invoke), %% 消息发送到订阅系统 case efka_service:get_pid(ServiceId) of @@ -289,7 +296,7 @@ handle_event(info, {server_push, PacketId, <>} %% 处理超时逻辑 erlang:start_timer(Timeout, self(), {request_timeout, Ref}), - {keep_state, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} + {keep_state, State#state{push_inflight = maps:put(Ref, PacketId, PushInflight)}} end; %% 处理task_log @@ -331,11 +338,11 @@ handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) -> {keep_state, State}; %% 收到来自efka_service的回复 -handle_event(info, {service_reply, Ref, EmsReply}, ?STATE_ACTIVATED, State = #state{inflight = Inflight, transport_pid = TransportPid}) -> - case maps:take(Ref, Inflight) of +handle_event(info, {service_reply, Ref, EmsReply}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) -> + case maps:take(Ref, PushInflight) of error -> {keep_state, State}; - {PacketId, NInflight} -> + {PacketId, NPushInflight} -> Reply = case EmsReply of {ok, Result} -> #async_call_reply{code = 1, result = Result}; @@ -344,19 +351,19 @@ handle_event(info, {service_reply, Ref, EmsReply}, ?STATE_ACTIVATED, State = #st end, efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), - {keep_state, State#state{inflight = NInflight}} + {keep_state, State#state{push_inflight = NPushInflight}} end; %% todo 请求超时逻辑处理 -handle_event(info, {timeout, _, {request_timeout, Ref}}, ?STATE_ACTIVATED, State = #state{inflight = Inflight, transport_pid = TransportPid}) -> - case maps:take(Ref, Inflight) of +handle_event(info, {timeout, _, {request_timeout, Ref}}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) -> + case maps:take(Ref, PushInflight) of error -> {keep_state, State}; - {PacketId, NInflight} -> + {PacketId, NPushInflight} -> Reply = #async_call_reply{code = 0, message = <<"reqeust timeout">>, result = <<>>}, efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), - {keep_state, State#state{inflight = NInflight}} + {keep_state, State#state{push_inflight = NPushInflight}} end; %% transport进程退出