From d18a371b4417fd690001ab4ebd858c124b809003 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 18 Sep 2025 10:53:54 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_codec.erl | 42 ++++++++++++++++++++------------ apps/efka/src/efka_transport.erl | 8 +++--- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/apps/efka/src/efka_codec.erl b/apps/efka/src/efka_codec.erl index 71787cd..8ea401b 100644 --- a/apps/efka/src/efka_codec.erl +++ b/apps/efka/src/efka_codec.erl @@ -68,26 +68,32 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) marshal(?Bytes, Params) ]). --spec decode(Bin :: binary()) -> any(). +-spec decode(Bin :: binary()) -> {ok, Message :: any()} | error. decode(<>) -> - Fields = unmarshal(Packet), - decode0(PacketType, Fields). + case unmarshal(Packet) of + {ok, Fields} -> + decode0(PacketType, Fields); + error -> + error + end. decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) -> - #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}; + {ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}}; decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) -> - #auth_reply{code = Code, message = Message}; + {ok, #auth_reply{code = Code, message = Message}}; decode0(?MESSAGE_PUB, [Topic, Content]) -> - #pub{topic = Topic, content = Content}; + {ok, #pub{topic = Topic, content = Content}}; decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> - #command{command_type = CommandType, command = Command}; + {ok, #command{command_type = CommandType, command = Command}}; decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) -> - #rpc_deploy{task_id = TaskId, config = Config}; + {ok, #rpc_deploy{task_id = TaskId, config = Config}}; decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) -> - #rpc_container{method = Method, container_name = ContainerName, params = Params}; + {ok, #rpc_container{method = Method, container_name = ContainerName, params = Params}}; decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> - #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}; + {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> - #event{service_id = ServiceId, event_type = EventType, params = Params}. + {ok, #event{service_id = ServiceId, event_type = EventType, params = Params}}; +decode0(_, _) -> + error. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% helper methods @@ -97,17 +103,23 @@ decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> marshal(?I32, Field) when is_integer(Field) -> <>; marshal(?Bytes, undefined) -> - <>; + <>; marshal(?Bytes, Field) when is_binary(Field) -> + Len = byte_size(Field), + <>; +marshal(?Bytes, Field0) -> + Field = unicode:characters_to_binary(Field0), Len = byte_size(Field), <>. --spec unmarshal(Bin :: binary()) -> Components :: [any()]. +-spec unmarshal(Bin :: binary()) -> {ok, Components :: [any()]} | error. unmarshal(Bin) when is_binary(Bin) -> unmarshal(Bin, []). unmarshal(<<>>, Acc) -> - lists:reverse(Acc); + {ok, lists:reverse(Acc)}; unmarshal(<>, Acc) -> unmarshal(Rest, [F|Acc]); unmarshal(<>, Acc) -> - unmarshal(Rest, [F|Acc]). \ No newline at end of file + unmarshal(Rest, [F|Acc]); +unmarshal(_, _) -> + error. diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 5d90b63..7c9b300 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -118,7 +118,7 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi %% 需要等待auth返回的结果 receive {ssl, Socket, <>} -> - Reply = efka_codec:decode(ReplyBin), + {ok, #auth_reply{} = Reply} = efka_codec:decode(ReplyBin), ParentPid ! {auth_reply, {ok, Reply}}, {noreply, State}; {ssl, Socket, Info} -> @@ -147,17 +147,17 @@ handle_cast({rpc_reply, PacketId, Response}, State = #state{socket = Socket}) -> {stop, Reason :: term(), NewState :: #state{}}). %% 服务器主动推送的数据,有packetId的是要求返回的;为0的表示不需要返回值 handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - Command = efka_codec:decode(CommandBin), + {ok, #command{} = Command} = efka_codec:decode(CommandBin), ParentPid ! {server_command, Command}, {noreply, State}; handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - Pub = efka_codec:decode(PubBin), + {ok, #pub{} = Pub} = efka_codec:decode(PubBin), ParentPid ! {server_pub, Pub}, {noreply, State}; handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - RPCRequest = efka_codec:decode(RPCRequestBin), + {ok, RPCRequest} = efka_codec:decode(RPCRequestBin), ParentPid ! {server_rpc, PacketId, RPCRequest}, {noreply, State};