142 lines
4.9 KiB
Erlang
142 lines
4.9 KiB
Erlang
%%%-------------------------------------------------------------------
|
|
%%% @author anlicheng
|
|
%%% @copyright (C) 2025, <COMPANY>
|
|
%%% @doc
|
|
%%%
|
|
%%% @end
|
|
%%% Created : 17. 9月 2025 16:05
|
|
%%%-------------------------------------------------------------------
|
|
-module(message_codec).
|
|
-author("anlicheng").
|
|
-include("message.hrl").
|
|
|
|
-define(I8, 1).
|
|
-define(I16, 2).
|
|
-define(I32, 3).
|
|
-define(Bytes, 4).
|
|
|
|
%% API
|
|
-export([encode/2, decode/1]).
|
|
|
|
-spec encode(MessageType :: integer(), Message :: any()) -> binary().
|
|
encode(MessageType, Message) when is_integer(MessageType) ->
|
|
Bin = encode0(Message),
|
|
<<MessageType, Bin/binary>>.
|
|
encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}) ->
|
|
iolist_to_binary([
|
|
marshal(?Bytes, UUID),
|
|
marshal(?Bytes, Username),
|
|
marshal(?Bytes, Salt),
|
|
marshal(?Bytes, Token),
|
|
marshal(?I32, Timestamp)
|
|
]);
|
|
encode0(#auth_reply{code = Code, payload = Payload}) ->
|
|
iolist_to_binary([
|
|
marshal(?I32, Code),
|
|
marshal(?Bytes, Payload)
|
|
]);
|
|
encode0(#jsonrpc_reply{result = Result, error = undefined}) ->
|
|
ResultBin = erlang:term_to_binary(#{<<"result">> => Result}),
|
|
iolist_to_binary([
|
|
marshal(?Bytes, ResultBin)
|
|
]);
|
|
encode0(#jsonrpc_reply{result = undefined, error = Error}) ->
|
|
ResultBin = erlang:term_to_binary(#{<<"error">> => Error}),
|
|
iolist_to_binary([
|
|
marshal(?Bytes, ResultBin)
|
|
]);
|
|
encode0(#pub{topic = Topic, qos = Qos, content = Content}) ->
|
|
iolist_to_binary([
|
|
marshal(?Bytes, Topic),
|
|
marshal(?I8, Qos),
|
|
marshal(?Bytes, Content)
|
|
]);
|
|
encode0(#command{command_type = CommandType, command = Command}) ->
|
|
iolist_to_binary([
|
|
marshal(?I32, CommandType),
|
|
marshal(?Bytes, Command)
|
|
]);
|
|
|
|
encode0(#jsonrpc_request{method = Method, params = Params}) ->
|
|
ReqBody = erlang:term_to_binary(#{<<"method">> => Method, <<"params">> => Params}),
|
|
iolist_to_binary([
|
|
marshal(?Bytes, ReqBody)
|
|
]);
|
|
encode0(#data{route_key = RouteKey, metric = Metric}) ->
|
|
iolist_to_binary([
|
|
marshal(?Bytes, RouteKey),
|
|
marshal(?Bytes, Metric)
|
|
]);
|
|
encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) ->
|
|
iolist_to_binary([
|
|
marshal(?I32, TaskId),
|
|
marshal(?Bytes, Type),
|
|
marshal(?Bytes, Stream)
|
|
]).
|
|
|
|
-spec decode(Bin :: binary()) -> {ok, Message :: any()} | error.
|
|
decode(<<PacketType:8, Packet/binary>>) ->
|
|
case unmarshal(Packet) of
|
|
{ok, Fields} ->
|
|
decode0(PacketType, Fields);
|
|
error ->
|
|
error
|
|
end.
|
|
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
|
|
{ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}};
|
|
decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) ->
|
|
case erlang:binary_to_term(ReplyBin) of
|
|
#{<<"result">> := Result} ->
|
|
{ok, #jsonrpc_reply{result = Result}};
|
|
#{<<"error">> := Error} ->
|
|
{ok, #jsonrpc_reply{error = Error}};
|
|
_ ->
|
|
error
|
|
end;
|
|
decode0(?MESSAGE_PUB, [Topic, Qos, Content]) ->
|
|
{ok, #pub{topic = Topic, qos = Qos, content = Content}};
|
|
decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
|
|
{ok, #command{command_type = CommandType, command = Command}};
|
|
decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) ->
|
|
{ok, #auth_reply{code = Code, payload = Payload}};
|
|
decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) ->
|
|
#{<<"method">> := Method, <<"params">> := Params} = erlang:binary_to_term(ReqBody),
|
|
{ok, #jsonrpc_request{method = Method, params = Params}};
|
|
decode0(?MESSAGE_DATA, [RouteKey, Metric]) ->
|
|
{ok, #data{route_key = RouteKey, metric = Metric}};
|
|
decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) ->
|
|
{ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}};
|
|
decode0(_, _) ->
|
|
error.
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
%%% helper methods
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
|
|
-spec marshal(Type :: ?I8 | ?I16 | ?I32 | ?Bytes, Field :: integer() | binary()) -> binary().
|
|
marshal(?I8, Field) when is_integer(Field) ->
|
|
<<?I8, Field:8>>;
|
|
marshal(?I16, Field) when is_integer(Field) ->
|
|
<<?I16, Field:16>>;
|
|
marshal(?I32, Field) when is_integer(Field) ->
|
|
<<?I32, Field:32>>;
|
|
marshal(?Bytes, Field) when is_binary(Field) ->
|
|
Len = byte_size(Field),
|
|
<<?Bytes, Len:16, Field/binary>>.
|
|
|
|
-spec unmarshal(Bin :: binary()) -> {ok, Components :: [any()]} | error.
|
|
unmarshal(Bin) when is_binary(Bin) ->
|
|
unmarshal(Bin, []).
|
|
unmarshal(<<>>, Acc) ->
|
|
{ok, lists:reverse(Acc)};
|
|
unmarshal(<<?I8, F:8, Rest/binary>>, Acc) ->
|
|
unmarshal(Rest, [F|Acc]);
|
|
unmarshal(<<?I16, F:16, Rest/binary>>, Acc) ->
|
|
unmarshal(Rest, [F|Acc]);
|
|
unmarshal(<<?I32, F:32, Rest/binary>>, Acc) ->
|
|
unmarshal(Rest, [F|Acc]);
|
|
unmarshal(<<?Bytes, Len:16, F:Len/binary, Rest/binary>>, Acc) ->
|
|
unmarshal(Rest, [F|Acc]);
|
|
unmarshal(_, _) ->
|
|
error.
|