This commit is contained in:
anlicheng 2025-05-09 22:03:24 +08:00
parent 849fdb50a7
commit 3b4da2e10b
5 changed files with 487 additions and 684 deletions

View File

@ -23,47 +23,43 @@
-define(TASK_STATUS_FAILED, 0). %% 线 -define(TASK_STATUS_FAILED, 0). %% 线
-define(TASK_STATUS_OK, 1). %% 线 -define(TASK_STATUS_OK, 1). %% 线
%% efka主动发起的消息体类型 %% efka主动发起的消息体类型,
-define(PACKET_REQUEST, 16#01). -define(PACKET_REQUEST, 16#01).
-define(PACKET_RESPONSE, 16#02). -define(PACKET_RESPONSE, 16#02).
%% pub/sub的消息 %% pub/sub的消息,
-define(PACKET_PUB, 16#03). -define(PACKET_PUB, 16#03).
%% push调用不需要返回
%% push调用不需要返回,
-define(PACKET_COMMAND, 16#04). -define(PACKET_COMMAND, 16#04).
%% %%
-define(PACKET_ASYNC_REQUEST, 16#5). -define(PACKET_PUSH, 16#05).
-define(PACKET_ASYNC_RESPONSE, 16#6). -define(PACKET_PUSH_REPLY, 16#06).
%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-define(COMMAND_AUTH, 16#8). %%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% %%
%% websocket的register关系 -define(METHOD_AUTH, 16#01).
-define(METHOD_AUTH, 16#00).
-define(METHOD_CREATE_SESSION, 16#01).
-define(METHOD_DATA, 16#02). -define(METHOD_DATA, 16#02).
-define(METHOD_PING, 16#03). -define(METHOD_PING, 16#03).
-define(METHOD_INFORM, 16#04). -define(METHOD_INFORM, 16#04).
-define(METHOD_FEEDBACK_STEP, 16#05). -define(METHOD_EVENT, 16#05).
-define(METHOD_PHASE, 16#06).
-define(METHOD_EVENT, 16#07). %%%% ,
-define(METHOD_PHASE, 16#09).
%% %%
-define(METHOD_DEPLOY, 16#10). -define(COMMAND_AUTH, 16#08).
%% %%%% ,
-define(EVENT_DEVICE, 16#01).
%%
-define(EVENT_HOST, 16#02).
%% ai相关的事件
-define(EVENT_AI, 16#03).
%% -define(PUSH_DEPLOY, 16#01).
-define(DIRECTIVE_ZD_CTRL, 16#01). -define(PUSH_SERVICE_CONFIG, 16#02).
-define(PUSH_INVOKE, 16#03).
-define(PUSH_TASK_LOG, 16#04).
%% %%
-record(kv, { -record(kv, {
@ -85,14 +81,6 @@
fail_num = 0 fail_num = 0
}). }).
%%
-record(totalizator, {
key :: {SceneId :: integer(), Date :: calendar:date()},
scene_id :: integer(),
date :: calendar:date(),
option :: #option{}
}).
%% %%
-record(north_data, { -record(north_data, {
id = 0 :: integer(), id = 0 :: integer(),

View File

@ -27,6 +27,23 @@
}). }).
-endif. -endif.
-ifndef('PUB_PB_H').
-define('PUB_PB_H', true).
-record(pub,
{topic = <<>> :: unicode:chardata() | undefined, % = 1, optional
content = <<>> :: unicode:chardata() | undefined % = 2, optional
}).
-endif.
-ifndef('PUSH_REPLY_PB_H').
-define('PUSH_REPLY_PB_H', true).
-record(push_reply,
{code = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
result = <<>> :: unicode:chardata() | undefined, % = 2, optional
message = <<>> :: unicode:chardata() | undefined % = 3, optional
}).
-endif.
-ifndef('DEPLOY_PB_H'). -ifndef('DEPLOY_PB_H').
-define('DEPLOY_PB_H', true). -define('DEPLOY_PB_H', true).
-record(deploy, -record(deploy,
@ -36,37 +53,19 @@
}). }).
-endif. -endif.
-ifndef('EFKA_RESPONSE_PB_H'). -ifndef('FETCH_TASK_LOG_PB_H').
-define('EFKA_RESPONSE_PB_H', true). -define('FETCH_TASK_LOG_PB_H', true).
-record(efka_response, -record(fetch_task_log,
{code = 0 :: integer() | undefined, % = 1, optional, 32 bits {task_id = 0 :: non_neg_integer() | undefined % = 1, optional, 32 bits
result = <<>> :: unicode:chardata() | undefined, % = 2, optional
message = <<>> :: unicode:chardata() | undefined % = 3, optional
}). }).
-endif. -endif.
-ifndef('PUB_PB_H'). -ifndef('INVOKE_PB_H').
-define('PUB_PB_H', true). -define('INVOKE_PB_H', true).
-record(pub, -record(invoke,
{topic = <<>> :: unicode:chardata() | undefined, % = 1, optional {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
content = <<>> :: unicode:chardata() | undefined % = 2, optional payload = <<>> :: unicode:chardata() | undefined, % = 2, optional
}). timeout = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
-endif.
-ifndef('ASYNC_REQUEST_PB_H').
-define('ASYNC_REQUEST_PB_H', true).
-record(async_request,
{method = <<>> :: unicode:chardata() | undefined, % = 1, optional
params = <<>> :: unicode:chardata() | undefined % = 2, optional
}).
-endif.
-ifndef('ASYNC_RESPONSE_PB_H').
-define('ASYNC_RESPONSE_PB_H', true).
-record(async_response,
{code = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
result = <<>> :: unicode:chardata() | undefined, % = 2, optional
message = <<>> :: unicode:chardata() | undefined % = 3, optional
}). }).
-endif. -endif.
@ -117,15 +116,6 @@
}). }).
-endif. -endif.
-ifndef('FEEDBACK_PHASE_PB_H').
-define('FEEDBACK_PHASE_PB_H', true).
-record(feedback_phase,
{task_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
phase = <<>> :: unicode:chardata() | undefined, % = 2, optional
timestamp = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
}).
-endif.
-ifndef('EVENT_PB_H'). -ifndef('EVENT_PB_H').
-define('EVENT_PB_H', true). -define('EVENT_PB_H', true).
-record(event, -record(event,

View File

@ -201,7 +201,7 @@ handle_event({call, From}, {async_request, ReceiverPid, Method, Params}, _, Stat
true -> true ->
%% websocket发送请求 %% websocket发送请求
lager:debug("[iot_host] host: ~p, will async_request method: ~p, params: ~p", [UUID, Method, Params]), lager:debug("[iot_host] host: ~p, will async_request method: ~p, params: ~p", [UUID, Method, Params]),
Ref = tcp_channel:async_request(ChannelPid, ReceiverPid, Method, Params), Ref = tcp_channel:push(ChannelPid, ReceiverPid, Method, Params),
{keep_state, State, [{reply, From, {ok, Ref}}]}; {keep_state, State, [{reply, From, {ok, Ref}}]};
false -> false ->
@ -345,10 +345,6 @@ handle_event(cast, {handle, {inform, Inform}}, ?STATE_ACTIVATED, State = #state{
%% props id:id:id %% props id:id:id
{keep_state, State}; {keep_state, State};
handle_event(cast, {handle, {feedback_result, FeedbackResult}}, ?STATE_ACTIVATED, State = #state{has_session = true}) ->
#feedback_phase{task_id = TaskId} = FeedbackResult,
{keep_state, State};
handle_event(cast, {handle, {event, Event}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> handle_event(cast, {handle, {event, Event}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, Event]), lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, Event]),
#event{service_id = ServiceId, event_type = EventType, params = Params} = Event, #event{service_id = ServiceId, event_type = EventType, params = Params} = Event,

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@
%% API %% API
-export([stop/2, send/2]). -export([stop/2, send/2]).
-export([pub/3, async_request/4, command/3]). -export([pub/3, push/4, command/3]).
-export([start_link/2]). -export([start_link/2]).
%% gen_server callbacks %% gen_server callbacks
@ -44,10 +44,10 @@ command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is
gen_server:cast(Pid, {command, CommandType, Command}). gen_server:cast(Pid, {command, CommandType, Command}).
%% %%
-spec async_request(Pid :: pid(), ReceiverPid :: pid(), Method :: binary(), Params :: binary()) -> Ref :: reference(). -spec push(Pid :: pid(), ReceiverPid :: pid(), Method :: binary(), Params :: binary()) -> Ref :: reference().
async_request(Pid, ReceiverPid, Method, Params) when is_pid(Pid), is_binary(Method), is_binary(Params) -> push(Pid, ReceiverPid, PushType, PushBin) when is_pid(Pid), is_integer(PushType), is_binary(PushBin) ->
Ref = make_ref(), Ref = make_ref(),
gen_server:cast(Pid, {async_request, ReceiverPid, Ref, Method, Params}), gen_server:cast(Pid, {push, ReceiverPid, Ref, PushType, PushBin}),
Ref. Ref.
%% %%
@ -81,19 +81,17 @@ handle_call(_Request, _From, State) ->
%% , pub/sub机制 %% , pub/sub机制
handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) -> handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) ->
PubBin = message_pb:encode_msg(#pub{topic = Topic, content = Content}), PubBin = message_pb:encode_msg(#pub{topic = Topic, content = Content}),
Transport:send(Socket, <<?PACKET_PUB, 0:32, PubBin/binary>>), Transport:send(Socket, <<?PACKET_PUB, PubBin/binary>>),
{noreply, State}; {noreply, State};
%% Command消息 %% Command消息
handle_cast({command, CommandType, Command}, State = #state{transport = Transport, socket = Socket}) -> handle_cast({command, CommandType, Command}, State = #state{transport = Transport, socket = Socket}) ->
Transport:send(Socket, <<?PACKET_COMMAND, 0:32, CommandType:8, Command/binary>>), Transport:send(Socket, <<?PACKET_COMMAND, CommandType:8, Command/binary>>),
{noreply, State}; {noreply, State};
handle_cast({async_request, ReceiverPid, Ref, Method, Params}, %%
State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_cast({push, ReceiverPid, Ref, PushType, PushBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
RequestBin = message_pb:encode_msg(#async_request{method = Method, params = Params}), Transport:send(Socket, <<?PACKET_PUSH, PacketId:32, PushType:8, PushBin/binary>>),
Transport:send(Socket, <<?PACKET_ASYNC_REQUEST, PacketId:32, RequestBin/binary>>),
{noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}. {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}.
%% auth验证 %% auth验证
@ -138,45 +136,40 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH:8, AuthRe
{stop, State} {stop, State}
end; end;
handle_info({tcp, Socket, <<?PACKET_REQUEST, 0:32, ?METHOD_DATA:8, Data0/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> handle_info({tcp, Socket, <<?PACKET_REQUEST, ?METHOD_DATA:8, Data0/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
Data = message_pb:decode_msg(Data0, data), Data = message_pb:decode_msg(Data0, data),
iot_host:handle(HostPid, {data, Data}), iot_host:handle(HostPid, {data, Data}),
{noreply, State}; {noreply, State};
handle_info({tcp, Socket, <<?PACKET_REQUEST, 0:32, ?METHOD_PING:8, PingData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> handle_info({tcp, Socket, <<?PACKET_REQUEST, ?METHOD_PING:8, PingData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
Ping = message_pb:decode_msg(PingData, ping), Ping = message_pb:decode_msg(PingData, ping),
iot_host:handle(HostPid, {ping, Ping}), iot_host:handle(HostPid, {ping, Ping}),
{noreply, State}; {noreply, State};
handle_info({tcp, Socket, <<?PACKET_REQUEST, 0:32, ?METHOD_INFORM:8, InformData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> handle_info({tcp, Socket, <<?PACKET_REQUEST, ?METHOD_INFORM:8, InformData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
ServiceInform = message_pb:decode_msg(InformData, service_inform), ServiceInform = message_pb:decode_msg(InformData, service_inform),
iot_host:handle(HostPid, {inform, ServiceInform}), iot_host:handle(HostPid, {inform, ServiceInform}),
{noreply, State}; {noreply, State};
handle_info({tcp, Socket, <<?PACKET_REQUEST, 0:32, ?METHOD_FEEDBACK_STEP:8, FeedbackStepData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) -> handle_info({tcp, Socket, <<?PACKET_REQUEST, ?METHOD_EVENT:8, EventData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
FeedbackStep = message_pb:decode_msg(FeedbackStepData, feedback_step),
iot_host:handle(HostPid, {feedback_step, FeedbackStep}),
{noreply, State};
handle_info({tcp, Socket, <<?PACKET_REQUEST, 0:32, ?METHOD_EVENT:8, EventData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
Event = message_pb:decode_msg(EventData, event), Event = message_pb:decode_msg(EventData, event),
iot_host:handle(HostPid, {event, Event}), iot_host:handle(HostPid, {event, Event}),
{noreply, State}; {noreply, State};
%% %%
handle_info({tcp, Socket, <<?PACKET_ASYNC_RESPONSE, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 -> handle_info({tcp, Socket, <<?PACKET_PUSH_REPLY, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 ->
AsyncResponse = message_pb:decode_msg(ResponseBin, async_response), PushReply = message_pb:decode_msg(ResponseBin, push_reply),
lager:debug("[ws_channel] uuid: ~p, get async_response message: ~p, packet_id: ~p", [UUID, AsyncResponse, PacketId]), lager:debug("[ws_channel] uuid: ~p, get push_reply: ~p, packet_id: ~p", [UUID, PushReply, PacketId]),
case maps:take(PacketId, Inflight) of case maps:take(PacketId, Inflight) of
error -> error ->
lager:warning("[ws_channel] get unknown async_response message: ~p, packet_id: ~p", [AsyncResponse, PacketId]), lager:warning("[ws_channel] get unknown async_response message: ~p, packet_id: ~p", [PushReply, PacketId]),
{noreply, State}; {noreply, State};
{{ReceiverPid, Ref}, NInflight} -> {{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true -> true ->
ReceiverPid ! {async_response, Ref, AsyncResponse}; ReceiverPid ! {push_reply, Ref, PushReply};
false -> false ->
lager:warning("[ws_channel] get async_response message: ~p, packet_id: ~p, but receiver_pid is deaded", [AsyncResponse, PacketId]) lager:warning("[ws_channel] get async_response message: ~p, packet_id: ~p, but receiver_pid is deaded", [PushReply, PacketId])
end, end,
{noreply, State#state{inflight = NInflight}} {noreply, State#state{inflight = NInflight}}
end; end;