This commit is contained in:
anlicheng 2025-09-24 17:32:36 +08:00
parent 2a9773dfc9
commit a755913e79
3 changed files with 20 additions and 1 deletions

View File

@ -28,6 +28,9 @@
-define(MESSAGE_DATA, 16#06). -define(MESSAGE_DATA, 16#06).
-define(MESSAGE_EVENT, 16#07). -define(MESSAGE_EVENT, 16#07).
%% efka主动上报的event-stream流, : docker-create的实时处理逻辑上报
-define(MESSAGE_EVENT_STREAM, 16#08).
%% %%
-define(MESSAGE_RPC_REPLY, 16#FF). -define(MESSAGE_RPC_REPLY, 16#FF).
@ -81,3 +84,9 @@
event_type :: integer(), event_type :: integer(),
params :: binary() params :: binary()
}). }).
-record(task_event_stream, {
task_id :: integer(),
type :: binary(),
stream :: binary()
}).

View File

@ -66,6 +66,12 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params})
marshal(?Bytes, ServiceId), marshal(?Bytes, ServiceId),
marshal(?I32, EventType), marshal(?I32, EventType),
marshal(?Bytes, Params) marshal(?Bytes, Params)
]);
encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) ->
iolist_to_binary([
marshal(?I32, TaskId),
marshal(?Bytes, Type),
marshal(?Bytes, Stream)
]). ]).
-spec decode(Bin :: binary()) -> {ok, Message :: any()} | error. -spec decode(Bin :: binary()) -> {ok, Message :: any()} | error.
@ -92,6 +98,8 @@ decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
{ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}};
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
{ok, #event{service_id = ServiceId, event_type = EventType, params = Params}}; {ok, #event{service_id = ServiceId, event_type = EventType, params = Params}};
decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) ->
{ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}};
decode0(_, _) -> decode0(_, _) ->
error. error.

View File

@ -138,7 +138,9 @@ handle_info({tcp, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{sock
#data{} = Data -> #data{} = Data ->
iot_host:handle(HostPid, {data, Data}); iot_host:handle(HostPid, {data, Data});
#event{} = Event -> #event{} = Event ->
iot_host:handle(HostPid, {event, Event}) iot_host:handle(HostPid, {event, Event});
#task_event_stream{task_id = TaskId, type = Type, stream = Stream} ->
lager:debug("[tcp_channel] get task_id: ~p, type: ~ts, stream: ~ts", [TaskId, Type, Stream])
end, end,
{noreply, State}; {noreply, State};