fix message

This commit is contained in:
anlicheng 2025-09-18 11:30:38 +08:00
parent 8181d1af19
commit 63562f2e6a
5 changed files with 240 additions and 2881 deletions

View File

@ -0,0 +1,94 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%% , 1: topic的pub/sub机制; 2. target的单点通讯和广播
%%% @end
%%% Created : 21. 4 2025 17:28
%%%-------------------------------------------------------------------
-author("anlicheng").
%% efka主动发起的消息体类型,
-define(PACKET_REQUEST, 16#01).
-define(PACKET_RESPONSE, 16#02).
%% efka主动发起不需要返回的数据
-define(PACKET_CAST, 16#03).
%% pub/sub的消息,
-define(PACKET_PUB, 16#04).
%% push调用不需要返回,
-define(PACKET_COMMAND, 16#05).
%% RPC调用
%% Service通讯,
-define(PACKET_RPC, 16#10).
-define(PACKET_RPC_REPLY, 16#11).
%% ping包
-define(PACKET_PING, 16#FF).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
-define(MESSAGE_AUTH_REQUEST, 16#01).
-define(MESSAGE_AUTH_REPLY, 16#02).
-define(MESSAGE_PUB, 16#03).
-define(MESSAGE_COMMAND, 16#04).
-define(MESSAGE_RPC_DEPLOY, 16#05).
-define(MESSAGE_RPC_CONTAINER, 16#06).
-define(MESSAGE_DATA, 16#07).
-define(MESSAGE_EVENT, 16#08).
%%%% ,
%%
-define(COMMAND_AUTH, 16#08).
-record(auth_request, {
uuid :: binary(),
username :: binary(),
salt :: binary(),
token :: binary(),
timestamp :: integer()
}).
-record(auth_reply, {
code :: integer(),
message :: binary()
}).
-record(pub, {
topic :: binary(),
content :: binary()
}).
-record(command, {
command_type :: integer(),
command :: binary()
}).
-record(rpc_deploy, {
task_id :: integer(),
config :: binary()
}).
-record(rpc_container, {
method :: binary(),
container_name :: binary(),
params = <<>> :: binary()
}).
-record(data, {
service_id :: binary(),
device_uuid :: binary(),
route_key :: binary(),
metric :: binary()
}).
-record(event, {
service_id :: binary(),
event_type :: integer(),
params :: binary()
}).

View File

@ -1,133 +0,0 @@
%% -*- coding: utf-8 -*-
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.21.1
-ifndef(message_pb).
-define(message_pb, true).
-define(message_pb_gpb_version, "4.21.1").
-ifndef('AUTH_REQUEST_PB_H').
-define('AUTH_REQUEST_PB_H', true).
-record(auth_request,
{uuid = <<>> :: unicode:chardata() | undefined, % = 1, optional
username = <<>> :: unicode:chardata() | undefined, % = 2, optional
salt = <<>> :: unicode:chardata() | undefined, % = 4, optional
token = <<>> :: unicode:chardata() | undefined, % = 5, optional
timestamp = 0 :: non_neg_integer() | undefined % = 6, optional, 32 bits
}).
-endif.
-ifndef('AUTH_REPLY_PB_H').
-define('AUTH_REPLY_PB_H', true).
-record(auth_reply,
{code = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
message = <<>> :: unicode:chardata() | undefined % = 2, optional
}).
-endif.
-ifndef('PUB_PB_H').
-define('PUB_PB_H', true).
-record(pub,
{topic = <<>> :: unicode:chardata() | undefined, % = 1, optional
content = <<>> :: iodata() | undefined % = 2, optional
}).
-endif.
-ifndef('COMMAND_PB_H').
-define('COMMAND_PB_H', true).
-record(command,
{command_type = <<>> :: unicode:chardata() | undefined, % = 1, optional
command = <<>> :: iodata() | undefined % = 2, optional
}).
-endif.
-ifndef('RPC_DEPLOY_PB_H').
-define('RPC_DEPLOY_PB_H', true).
-record(rpc_deploy,
{packet_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
task_id = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
config = <<>> :: unicode:chardata() | undefined % = 3, optional
}).
-endif.
-ifndef('RPC_START_CONTAINER_PB_H').
-define('RPC_START_CONTAINER_PB_H', true).
-record(rpc_start_container,
{packet_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
container_name = <<>> :: unicode:chardata() | undefined % = 2, optional
}).
-endif.
-ifndef('RPC_STOP_CONTAINER_PB_H').
-define('RPC_STOP_CONTAINER_PB_H', true).
-record(rpc_stop_container,
{packet_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
container_name = <<>> :: unicode:chardata() | undefined % = 2, optional
}).
-endif.
-ifndef('RPC_CONFIG_CONTAINER_PB_H').
-define('RPC_CONFIG_CONTAINER_PB_H', true).
-record(rpc_config_container,
{packet_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
container_name = <<>> :: unicode:chardata() | undefined, % = 2, optional
config = <<>> :: iodata() | undefined % = 3, optional
}).
-endif.
-ifndef('FETCH_TASK_LOG_PB_H').
-define('FETCH_TASK_LOG_PB_H', true).
-record(fetch_task_log,
{task_id = 0 :: non_neg_integer() | undefined % = 1, optional, 32 bits
}).
-endif.
-ifndef('CONTAINER_CONFIG_PB_H').
-define('CONTAINER_CONFIG_PB_H', true).
-record(container_config,
{container_name = <<>> :: unicode:chardata() | undefined, % = 1, optional
config = <<>> :: iodata() | undefined % = 2, optional
}).
-endif.
-ifndef('DATA_PB_H').
-define('DATA_PB_H', true).
-record(data,
{service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
device_uuid = <<>> :: unicode:chardata() | undefined, % = 2, optional
route_key = <<>> :: unicode:chardata() | undefined, % = 3, optional
metric = <<>> :: iodata() | undefined % = 4, optional
}).
-endif.
-ifndef('EVENT_PB_H').
-define('EVENT_PB_H', true).
-record(event,
{service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
event_type = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
params = <<>> :: unicode:chardata() | undefined % = 3, optional
}).
-endif.
-ifndef('PING_PB_H').
-define('PING_PB_H', true).
-record(ping,
{adcode = <<>> :: unicode:chardata() | undefined, % = 1, optional
boot_time = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
province = <<>> :: unicode:chardata() | undefined, % = 3, optional
city = <<>> :: unicode:chardata() | undefined, % = 4, optional
efka_version = <<>> :: unicode:chardata() | undefined, % = 5, optional
kernel_arch = <<>> :: unicode:chardata() | undefined, % = 6, optional
ips = [] :: [unicode:chardata()] | undefined, % = 7, repeated
cpu_core = 0 :: non_neg_integer() | undefined, % = 8, optional, 32 bits
cpu_load = 0 :: non_neg_integer() | undefined, % = 9, optional, 32 bits
cpu_temperature = 0.0 :: float() | integer() | infinity | '-infinity' | nan | undefined, % = 10, optional
disk = [] :: [integer()] | undefined, % = 11, repeated, 32 bits
memory = [] :: [integer()] | undefined, % = 12, repeated, 32 bits
interfaces = <<>> :: unicode:chardata() | undefined % = 13, optional
}).
-endif.
-endif.

View File

@ -0,0 +1,123 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 17. 9 2025 16:05
%%%-------------------------------------------------------------------
-module(message_codec).
-author("anlicheng").
-include("message.hrl").
-define(I32, 1).
-define(Bytes, 2).
%% API
-export([encode/2, decode/1]).
-spec encode0(Message :: any()) -> binary().
encode(PacketType, Packet) when is_integer(PacketType) ->
Bin = encode0(Packet),
<<PacketType, Bin/binary>>.
encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}) ->
iolist_to_binary([
marshal(?Bytes, UUID),
marshal(?Bytes, Username),
marshal(?Bytes, Salt),
marshal(?Bytes, Token),
marshal(?I32, Timestamp)
]);
encode0(#auth_reply{code = Code, message = Message}) ->
iolist_to_binary([
marshal(?I32, Code),
marshal(?Bytes, Message)
]);
encode0(#pub{topic = Topic, content = Content}) ->
iolist_to_binary([
marshal(?Bytes, Topic),
marshal(?Bytes, Content)
]);
encode0(#command{command_type = CommandType, command = Command}) ->
iolist_to_binary([
marshal(?I32, CommandType),
marshal(?Bytes, Command)
]);
encode0(#rpc_deploy{task_id = TaskId, config = Config}) ->
iolist_to_binary([
marshal(?I32, TaskId),
marshal(?Bytes, Config)
]);
encode0(#rpc_container{method = Method, container_name = ContainerName, params = Params}) ->
iolist_to_binary([
marshal(?Bytes, Method),
marshal(?Bytes, ContainerName),
marshal(?Bytes, Params)
]);
encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) ->
iolist_to_binary([
marshal(?Bytes, ServiceId),
marshal(?Bytes, DeviceUUID),
marshal(?Bytes, RouteKey),
marshal(?Bytes, Metric)
]);
encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) ->
iolist_to_binary([
marshal(?Bytes, ServiceId),
marshal(?I32, EventType),
marshal(?Bytes, Params)
]).
-spec decode(Bin :: binary()) -> {ok, Message :: any()} | error.
decode(<<PacketType:8, Packet/binary>>) ->
case unmarshal(Packet) of
{ok, Fields} ->
decode0(PacketType, Fields);
error ->
error
end.
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
{ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}};
decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) ->
{ok, #auth_reply{code = Code, message = Message}};
decode0(?MESSAGE_PUB, [Topic, Content]) ->
{ok, #pub{topic = Topic, content = Content}};
decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
{ok, #command{command_type = CommandType, command = Command}};
decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) ->
{ok, #rpc_deploy{task_id = TaskId, config = Config}};
decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) ->
{ok, #rpc_container{method = Method, container_name = ContainerName, params = Params}};
decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
{ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}};
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
{ok, #event{service_id = ServiceId, event_type = EventType, params = Params}};
decode0(_, _) ->
error.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec marshal(Type :: integer(), Field :: any()) -> binary().
marshal(?I32, Field) when is_integer(Field) ->
<<?I32, Field:32>>;
marshal(?Bytes, Field) when is_binary(Field) ->
Len = byte_size(Field),
<<?Bytes, Len:16, Field/binary>>;
marshal(?Bytes, Field0) ->
Field = unicode:characters_to_binary(Field0),
Len = byte_size(Field),
<<?Bytes, Len:16, Field/binary>>.
-spec unmarshal(Bin :: binary()) -> {ok, Components :: [any()]} | error.
unmarshal(Bin) when is_binary(Bin) ->
unmarshal(Bin, []).
unmarshal(<<>>, Acc) ->
{ok, lists:reverse(Acc)};
unmarshal(<<?I32, F:32, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]);
unmarshal(<<?Bytes, Len:16, F:Len/binary, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]);
unmarshal(_, _) ->
error.

File diff suppressed because it is too large Load Diff

View File

@ -8,8 +8,7 @@
%%%-------------------------------------------------------------------
-module(tcp_channel).
-author("licheng5").
-include("iot.hrl").
-include("message_pb.hrl").
-include("message.hrl").
%% API
-export([pub/3, async_call/4, command/3]).
@ -80,7 +79,7 @@ handle_call(_Request, _From, State) ->
%% , pub/sub机制
handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) ->
PubBin = message_pb:encode_msg(#pub{topic = Topic, content = Content}),
PubBin = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}),
Transport:send(Socket, <<?PACKET_PUB, PubBin/binary>>),
{noreply, State};
@ -91,12 +90,12 @@ handle_cast({command, CommandType, Command}, State = #state{transport = Transpor
%%
handle_cast({async_call, ReceiverPid, Ref, CallType, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Transport:send(Socket, <<?PACKET_ASYNC_CALL, PacketId:32, CallType:8, CallBin/binary>>),
Transport:send(Socket, <<?PACKET_RPC, PacketId:32, CallType:8, CallBin/binary>>),
{noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}.
%% auth验证
handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH:8, AuthRequestBin/binary>>}, State = #state{transport = Transport, socket = Socket}) ->
#auth_request{ uuid = UUID, username = Username, token = Token, salt = Salt, timestamp = Timestamp } = message_pb:decode_msg(AuthRequestBin, auth_request),
handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, RequestBin/binary>>}, State = #state{transport = Transport, socket = Socket}) ->
{ok, #auth_request{ uuid = UUID, username = Username, token = Token, salt = Salt, timestamp = Timestamp}} = message_codec:decode(RequestBin),
lager:debug("[ws_channel] auth uuid: ~p", [UUID]),
case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of
true ->
@ -111,20 +110,20 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH:8, AuthRe
ok ->
%% host的monitor
erlang:monitor(process, HostPid),
AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 0, message = <<"ok">>}),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 0, message = <<"ok">>}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
{denied, Reason} when is_binary(Reason) ->
erlang:monitor(process, HostPid),
AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 1, message = Reason}),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 1, message = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
{error, Reason} when is_binary(Reason) ->
AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 2, message = Reason}),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 2, message = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
@ -136,51 +135,33 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH:8, AuthRe
{stop, State}
end;
%%
handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_REQUEST_SERVICE_CONFIG:8, ServiceId/binary>>}, State = #state{transport = Transport, socket = Socket}) ->
lager:debug("[ws_channel] service_config request service_id: ~p", [ServiceId]),
case micro_service_bo:get_service_config(ServiceId) of
error ->
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32>>);
{ok, ConfigJson} when is_binary(ConfigJson) ->
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, ConfigJson/binary>>)
handle_info({tcp, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
{ok, CastMessage} = message_codec:decode(CastBin),
case CastMessage of
#data{} = Data ->
iot_host:handle(HostPid, {data, Data});
#event{} = Event ->
iot_host:handle(HostPid, {event, Event})
end,
{noreply, State};
handle_info({tcp, Socket, <<?PACKET_REQUEST, ?METHOD_DATA:8, Data0/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
Data = message_pb:decode_msg(Data0, data),
iot_host:handle(HostPid, {data, Data}),
{noreply, State};
handle_info({tcp, Socket, <<?PACKET_REQUEST, ?METHOD_PING:8, PingData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
Ping = message_pb:decode_msg(PingData, ping),
iot_host:handle(HostPid, {ping, Ping}),
{noreply, State};
handle_info({tcp, Socket, <<?PACKET_REQUEST, ?METHOD_INFORM:8, InformData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
ServiceInform = message_pb:decode_msg(InformData, service_inform),
iot_host:handle(HostPid, {inform, ServiceInform}),
{noreply, State};
handle_info({tcp, Socket, <<?PACKET_REQUEST, ?METHOD_EVENT:8, EventData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
Event = message_pb:decode_msg(EventData, event),
iot_host:handle(HostPid, {event, Event}),
{noreply, State};
%handle_info({tcp, Socket, <<?PACKET_PING, PingData/binary>>}, State = #state{socket = Socket, host_pid = HostPid}) when is_pid(HostPid) ->
% Ping = message_pb:decode_msg(PingData, ping),
% iot_host:handle(HostPid, {ping, Ping}),
% {noreply, State};
%%
handle_info({tcp, Socket, <<?PACKET_ASYNC_CALL_REPLY, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 ->
AsyncCallReply = message_pb:decode_msg(ResponseBin, async_call_reply),
lager:debug("[ws_channel] uuid: ~p, get async_call_reply: ~p, packet_id: ~p", [UUID, AsyncCallReply, PacketId]),
handle_info({tcp, Socket, <<?PACKET_RPC_REPLY, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 ->
{ok, RpcReply} = message_codec:decode(ResponseBin),
case maps:take(PacketId, Inflight) of
error ->
lager:warning("[ws_channel] get unknown async_call_reply message: ~p, packet_id: ~p", [AsyncCallReply, PacketId]),
{noreply, State};
{{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true ->
ReceiverPid ! {async_call_reply, Ref, AsyncCallReply};
ReceiverPid ! {rpc_reply, Ref, RpcReply};
false ->
lager:warning("[ws_channel] get async_call_reply message: ~p, packet_id: ~p, but receiver_pid is deaded", [AsyncCallReply, PacketId])
lager:warning("[ws_channel] get async_call_reply message: ~p, packet_id: ~p, but receiver_pid is deaded", [RpcReply, PacketId])
end,
{noreply, State#state{inflight = NInflight}}
end;