From a755913e7927cff38393e7fcdfb23f2cb70794dd Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 24 Sep 2025 17:32:36 +0800 Subject: [PATCH] fix --- apps/iot/include/message.hrl | 9 +++++++++ apps/iot/src/message/message_codec.erl | 8 ++++++++ apps/iot/src/tcp/tcp_channel.erl | 4 +++- 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/apps/iot/include/message.hrl b/apps/iot/include/message.hrl index f5f13c9..2f1b985 100644 --- a/apps/iot/include/message.hrl +++ b/apps/iot/include/message.hrl @@ -28,6 +28,9 @@ -define(MESSAGE_DATA, 16#06). -define(MESSAGE_EVENT, 16#07). +%% efka主动上报的event-stream流, 单向消息,主要是: docker-create的实时处理逻辑上报 +-define(MESSAGE_EVENT_STREAM, 16#08). + %% 响应数据 -define(MESSAGE_RPC_REPLY, 16#FF). @@ -80,4 +83,10 @@ service_id :: binary(), event_type :: integer(), params :: binary() +}). + +-record(task_event_stream, { + task_id :: integer(), + type :: binary(), + stream :: binary() }). \ No newline at end of file diff --git a/apps/iot/src/message/message_codec.erl b/apps/iot/src/message/message_codec.erl index 6e2927b..eb34af5 100644 --- a/apps/iot/src/message/message_codec.erl +++ b/apps/iot/src/message/message_codec.erl @@ -66,6 +66,12 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) marshal(?Bytes, ServiceId), marshal(?I32, EventType), marshal(?Bytes, Params) + ]); +encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) -> + iolist_to_binary([ + marshal(?I32, TaskId), + marshal(?Bytes, Type), + marshal(?Bytes, Stream) ]). -spec decode(Bin :: binary()) -> {ok, Message :: any()} | error. @@ -92,6 +98,8 @@ decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> {ok, #event{service_id = ServiceId, event_type = EventType, params = Params}}; +decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) -> + {ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}}; decode0(_, _) -> error. diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index 11f4eb5..722d359 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -138,7 +138,9 @@ handle_info({tcp, Socket, <>}, State = #state{sock #data{} = Data -> iot_host:handle(HostPid, {data, Data}); #event{} = Event -> - iot_host:handle(HostPid, {event, Event}) + iot_host:handle(HostPid, {event, Event}); + #task_event_stream{task_id = TaskId, type = Type, stream = Stream} -> + lager:debug("[tcp_channel] get task_id: ~p, type: ~ts, stream: ~ts", [TaskId, Type, Stream]) end, {noreply, State};