diff --git a/apps/efka/include/efka.hrl b/apps/efka/include/efka.hrl index e3c3823..5befe01 100644 --- a/apps/efka/include/efka.hrl +++ b/apps/efka/include/efka.hrl @@ -46,4 +46,51 @@ -define(RPC_DEPLOY, 16#01). -define(RPC_START_CONTAINER, 16#02). -define(RPC_STOP_CONTAINER, 16#03). --define(RPC_CONFIG_CONTAINER, 16#04). \ No newline at end of file +-define(RPC_CONFIG_CONTAINER, 16#04). + +-record(auth_request, { + uuid :: binary(), + username :: binary(), + salt :: binary(), + token :: binary(), + timestamp :: integer() +}). + +-record(auth_reply, { + code :: integer(), + message :: binary() +}). + +-record(pub, { + topic :: binary(), + content :: binary() +}). + +-record(command, { + command_type :: integer(), + command :: binary() +}). + +-record(rpc_deploy, { + task_id :: integer(), + config :: binary() +}). + +-record(rpc_container, { + method :: binary(), + container_name :: binary(), + params = <<>> :: binary() +}). + +-record(data, { + service_id :: binary(), + device_uuid :: binary(), + route_key :: binary(), + metric :: binary() +}). + +-record(event, { + service_id :: binary(), + event_type :: integer(), + params :: binary() +}). \ No newline at end of file diff --git a/apps/efka/src/efka_codec.erl b/apps/efka/src/efka_codec.erl new file mode 100644 index 0000000..b7f500f --- /dev/null +++ b/apps/efka/src/efka_codec.erl @@ -0,0 +1,107 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 17. 9月 2025 16:05 +%%%------------------------------------------------------------------- +-module(efka_codec). +-author("anlicheng"). +-include("efka.hrl"). + +-define(I32, 1). +-define(Bytes, 2). + +%% API +-export([encode/1, decode/1]). + +-spec encode(Message :: any()) -> binary(). +encode(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}) -> + iolist_to_binary([ + marshal(?Bytes, UUID), + marshal(?Bytes, Username), + marshal(?Bytes, Salt), + marshal(?Bytes, Token), + marshal(?I32, Timestamp) + ]); +encode(#auth_reply{code = Code, message = Message}) -> + iolist_to_binary([ + marshal(?I32, Code), + marshal(?Bytes, Message) + ]); +encode(#pub{topic = Topic, content = Content}) -> + iolist_to_binary([ + marshal(?Bytes, Topic), + marshal(?Bytes, Content) + ]); +encode(#command{command_type = CommandType, command = Command}) -> + iolist_to_binary([ + marshal(?I32, CommandType), + marshal(?Bytes, Command) + ]); +encode(#rpc_deploy{task_id = TaskId, config = Config}) -> + iolist_to_binary([ + marshal(?I32, TaskId), + marshal(?Bytes, Config) + ]); +encode(#rpc_container{method = Method, container_name = ContainerName, params = Params}) -> + iolist_to_binary([ + marshal(?Bytes, Method), + marshal(?Bytes, ContainerName), + marshal(?Bytes, Params) + ]); +encode(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) -> + iolist_to_binary([ + marshal(?Bytes, ServiceId), + marshal(?Bytes, DeviceUUID), + marshal(?Bytes, RouteKey), + marshal(?Bytes, Metric) + ]); +encode(#event{service_id = ServiceId, event_type = EventType, params = Params}) -> + iolist_to_binary([ + marshal(?Bytes, ServiceId), + marshal(?I32, EventType), + marshal(?Bytes, Params) + ]). + +decode(<>) -> + Fields = unmarshal(Packet), + decode0(PacketType, Fields). +decode0(?I32, [UUID, Username, Salt, Token, Timestamp]) -> + #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}; +decode0(?I32, [Code, Message]) -> + #auth_reply{code = Code, message = Message}; +decode0(?I32, [Topic, Content]) -> + #pub{topic = Topic, content = Content}; +decode0(?I32, [CommandType, Command]) -> + #command{command_type = CommandType, command = Command}; +decode0(?I32, [TaskId, Config]) -> + #rpc_deploy{task_id = TaskId, config = Config}; +decode0(?I32, [Method, ContainerName, Params]) -> + #rpc_container{method = Method, container_name = ContainerName, params = Params}; +decode0(?I32, [ServiceId, DeviceUUID, RouteKey, Metric]) -> + #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}; +decode0(?I32, [ServiceId, EventType, Params]) -> + #event{service_id = ServiceId, event_type = EventType, params = Params}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +marshal(?I32, Field) -> + <>; +marshal(?Bytes, undefined) -> + <>; +marshal(?Bytes, Field) -> + Len = byte_size(Field), + <>. + +unmarshal(Bin) -> + unmarshal(Bin, []). +unmarshal(<<>>, Acc) -> + lists:reverse(Acc); +unmarshal(<>, Acc) -> + unmarshal(Rest, [F|Acc]); +unmarshal(<>, Acc) -> + unmarshal(Rest, [F|Acc]). \ No newline at end of file