This commit is contained in:
anlicheng 2025-09-18 10:53:54 +08:00
parent a96bfc7da1
commit d18a371b44
2 changed files with 31 additions and 19 deletions

View File

@ -68,26 +68,32 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params})
marshal(?Bytes, Params) marshal(?Bytes, Params)
]). ]).
-spec decode(Bin :: binary()) -> any(). -spec decode(Bin :: binary()) -> {ok, Message :: any()} | error.
decode(<<PacketType:8, Packet/binary>>) -> decode(<<PacketType:8, Packet/binary>>) ->
Fields = unmarshal(Packet), case unmarshal(Packet) of
decode0(PacketType, Fields). {ok, Fields} ->
decode0(PacketType, Fields);
error ->
error
end.
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) -> decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}; {ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}};
decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) -> decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) ->
#auth_reply{code = Code, message = Message}; {ok, #auth_reply{code = Code, message = Message}};
decode0(?MESSAGE_PUB, [Topic, Content]) -> decode0(?MESSAGE_PUB, [Topic, Content]) ->
#pub{topic = Topic, content = Content}; {ok, #pub{topic = Topic, content = Content}};
decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
#command{command_type = CommandType, command = Command}; {ok, #command{command_type = CommandType, command = Command}};
decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) -> decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) ->
#rpc_deploy{task_id = TaskId, config = Config}; {ok, #rpc_deploy{task_id = TaskId, config = Config}};
decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) -> decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) ->
#rpc_container{method = Method, container_name = ContainerName, params = Params}; {ok, #rpc_container{method = Method, container_name = ContainerName, params = Params}};
decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}; {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}};
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
#event{service_id = ServiceId, event_type = EventType, params = Params}. {ok, #event{service_id = ServiceId, event_type = EventType, params = Params}};
decode0(_, _) ->
error.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% helper methods %%% helper methods
@ -97,17 +103,23 @@ decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
marshal(?I32, Field) when is_integer(Field) -> marshal(?I32, Field) when is_integer(Field) ->
<<?I32, Field:32>>; <<?I32, Field:32>>;
marshal(?Bytes, undefined) -> marshal(?Bytes, undefined) ->
<<?Bytes>>; <<?Bytes, 0:16>>;
marshal(?Bytes, Field) when is_binary(Field) -> marshal(?Bytes, Field) when is_binary(Field) ->
Len = byte_size(Field),
<<?Bytes, Len:16, Field/binary>>;
marshal(?Bytes, Field0) ->
Field = unicode:characters_to_binary(Field0),
Len = byte_size(Field), Len = byte_size(Field),
<<?Bytes, Len:16, Field/binary>>. <<?Bytes, Len:16, Field/binary>>.
-spec unmarshal(Bin :: binary()) -> Components :: [any()]. -spec unmarshal(Bin :: binary()) -> {ok, Components :: [any()]} | error.
unmarshal(Bin) when is_binary(Bin) -> unmarshal(Bin) when is_binary(Bin) ->
unmarshal(Bin, []). unmarshal(Bin, []).
unmarshal(<<>>, Acc) -> unmarshal(<<>>, Acc) ->
lists:reverse(Acc); {ok, lists:reverse(Acc)};
unmarshal(<<?I32, F:32, Rest/binary>>, Acc) -> unmarshal(<<?I32, F:32, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]); unmarshal(Rest, [F|Acc]);
unmarshal(<<?Bytes, Len:16, F:Len/binary, Rest/binary>>, Acc) -> unmarshal(<<?Bytes, Len:16, F:Len/binary, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]). unmarshal(Rest, [F|Acc]);
unmarshal(_, _) ->
error.

View File

@ -118,7 +118,7 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi
%% auth返回的结果 %% auth返回的结果
receive receive
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>} -> {ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>} ->
Reply = efka_codec:decode(ReplyBin), {ok, #auth_reply{} = Reply} = efka_codec:decode(ReplyBin),
ParentPid ! {auth_reply, {ok, Reply}}, ParentPid ! {auth_reply, {ok, Reply}},
{noreply, State}; {noreply, State};
{ssl, Socket, Info} -> {ssl, Socket, Info} ->
@ -147,17 +147,17 @@ handle_cast({rpc_reply, PacketId, Response}, State = #state{socket = Socket}) ->
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% packetId的是要求返回的0 %% packetId的是要求返回的0
handle_info({ssl, Socket, <<?PACKET_COMMAND, CommandBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> handle_info({ssl, Socket, <<?PACKET_COMMAND, CommandBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
Command = efka_codec:decode(CommandBin), {ok, #command{} = Command} = efka_codec:decode(CommandBin),
ParentPid ! {server_command, Command}, ParentPid ! {server_command, Command},
{noreply, State}; {noreply, State};
handle_info({ssl, Socket, <<?PACKET_PUB, PubBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> handle_info({ssl, Socket, <<?PACKET_PUB, PubBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
Pub = efka_codec:decode(PubBin), {ok, #pub{} = Pub} = efka_codec:decode(PubBin),
ParentPid ! {server_pub, Pub}, ParentPid ! {server_pub, Pub},
{noreply, State}; {noreply, State};
handle_info({ssl, Socket, <<?PACKET_RPC, PacketId:32, RPCRequestBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> handle_info({ssl, Socket, <<?PACKET_RPC, PacketId:32, RPCRequestBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
RPCRequest = efka_codec:decode(RPCRequestBin), {ok, RPCRequest} = efka_codec:decode(RPCRequestBin),
ParentPid ! {server_rpc, PacketId, RPCRequest}, ParentPid ! {server_rpc, PacketId, RPCRequest},
{noreply, State}; {noreply, State};