This commit is contained in:
anlicheng 2025-05-21 20:19:10 +08:00
parent 8786df0525
commit 764a7f67c0

View File

@ -34,8 +34,8 @@
-record(state, {
transport_pid :: undefined | pid(),
%% #{Ref => PacketId}
inflight = #{}
%% , #{Ref => PacketId}
push_inflight = #{}
}).
%%%===================================================================
@ -98,6 +98,13 @@ callback_mode() ->
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event({call, From}, {request_service_config, ReceiverPid, ServiceId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
Ref = efka_transport:request(TransportPid, ReceiverPid, ?METHOD_REQUEST_SERVICE_CONFIG, ServiceId),
{keep_state, State, [{reply, From, {ok, Ref}}]};
handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _, State) ->
{keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]};
handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
Packet = message_pb:encode_msg(#data{
service_id = ServiceId,
@ -255,7 +262,7 @@ handle_event(info, {server_push, PacketId, <<?PUSH_STOP_SERVICE:8, ServiceId/bin
{keep_state, State};
%% config.json配置信息
handle_event(info, {server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
handle_event(info, {server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) ->
#push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config),
case efka_service:get_pid(ServiceId) of
@ -270,11 +277,11 @@ handle_event(info, {server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/b
%%
erlang:start_timer(Timeout, self(), {request_timeout, Ref}),
{keep_state, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
{keep_state, State#state{push_inflight = maps:put(Ref, PacketId, PushInflight)}}
end;
%%
handle_event(info, {server_push, PacketId, <<?PUSH_INVOKE:8, InvokeBin/binary>>}, ?STATE_ACTIVATED, State = #state{inflight = Inflight, transport_pid = TransportPid}) ->
handle_event(info, {server_push, PacketId, <<?PUSH_INVOKE:8, InvokeBin/binary>>}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) ->
#invoke{service_id = ServiceId, payload = Payload, timeout = Timeout} = message_pb:decode_msg(InvokeBin, invoke),
%%
case efka_service:get_pid(ServiceId) of
@ -289,7 +296,7 @@ handle_event(info, {server_push, PacketId, <<?PUSH_INVOKE:8, InvokeBin/binary>>}
%%
erlang:start_timer(Timeout, self(), {request_timeout, Ref}),
{keep_state, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
{keep_state, State#state{push_inflight = maps:put(Ref, PacketId, PushInflight)}}
end;
%% task_log
@ -331,11 +338,11 @@ handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) ->
{keep_state, State};
%% efka_service的回复
handle_event(info, {service_reply, Ref, EmsReply}, ?STATE_ACTIVATED, State = #state{inflight = Inflight, transport_pid = TransportPid}) ->
case maps:take(Ref, Inflight) of
handle_event(info, {service_reply, Ref, EmsReply}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) ->
case maps:take(Ref, PushInflight) of
error ->
{keep_state, State};
{PacketId, NInflight} ->
{PacketId, NPushInflight} ->
Reply = case EmsReply of
{ok, Result} ->
#async_call_reply{code = 1, result = Result};
@ -344,19 +351,19 @@ handle_event(info, {service_reply, Ref, EmsReply}, ?STATE_ACTIVATED, State = #st
end,
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{keep_state, State#state{inflight = NInflight}}
{keep_state, State#state{push_inflight = NPushInflight}}
end;
%% todo
handle_event(info, {timeout, _, {request_timeout, Ref}}, ?STATE_ACTIVATED, State = #state{inflight = Inflight, transport_pid = TransportPid}) ->
case maps:take(Ref, Inflight) of
handle_event(info, {timeout, _, {request_timeout, Ref}}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) ->
case maps:take(Ref, PushInflight) of
error ->
{keep_state, State};
{PacketId, NInflight} ->
{PacketId, NPushInflight} ->
Reply = #async_call_reply{code = 0, message = <<"reqeust timeout">>, result = <<>>},
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{keep_state, State#state{inflight = NInflight}}
{keep_state, State#state{push_inflight = NPushInflight}}
end;
%% transport进程退出