From ffd786b96f73cf4f2e08896774328a8398810779 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 6 May 2025 17:23:40 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_client.erl | 2 +- apps/efka/src/efka_micro_service.erl | 21 ++- apps/efka/src/efka_tcp_channel.erl | 270 ++++++++++++++++++++------- apps/efka/src/tcp_channel.erl | 203 -------------------- 4 files changed, 221 insertions(+), 275 deletions(-) delete mode 100644 apps/efka/src/tcp_channel.erl diff --git a/apps/efka/src/efka_client.erl b/apps/efka/src/efka_client.erl index b4d7fb5..6406052 100644 --- a/apps/efka/src/efka_client.erl +++ b/apps/efka/src/efka_client.erl @@ -196,7 +196,7 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_cast({send_metric_data, Fields, Tags}, State = #state{socket = Socket}) -> - Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]), + Body = jiffy:encode(#{<<"fields">> => Fields, <<"tags">> => Tags}, [force_utf8]), Packet = <<0:32, ?PACKET_METRIC_DATA, Body/binary>>, ok = gen_tcp:send(Socket, Packet), diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index f88622b..d67ea2f 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -20,7 +20,8 @@ %% API -export([start_link/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). --export([push_arguments/2, push_metrics/2]). +-export([push_arguments/2, push_metrics/2, request_arguments/1, request_metrics/1]). +-export([metric_data/3, send_log/2, send_event/3, send_ai_event/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -70,6 +71,24 @@ push_arguments(Pid, Args) when is_pid(Pid), is_binary(Args) -> push_metrics(Pid, Metrics) when is_pid(Pid), is_binary(Metrics) -> gen_server:call(Pid, {push_metrics, Metrics}). +request_arguments(Pid) when is_pid(Pid) -> + gen_server:call(Pid, request_arguments). + +request_metrics(Pid) when is_pid(Pid) -> + gen_server:call(Pid, request_metrics). + +metric_data(Pid, Fields, Tags) -> + ok. + +send_log(Pid, Log) -> + ok. + +send_event(Pid, EventType, Params) -> + ok. + +send_ai_event(Pid, EventType, Params) -> + ok. + -spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}. attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> gen_server:call(Pid, {attach_channel, ChannelPid}). diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index ed33db4..c67f0ad 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -1,110 +1,240 @@ %%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2020, +%%% @author anlicheng +%%% @copyright (C) 2025, %%% @doc %%% %%% @end -%%% Created : 10. 12月 2020 上午11:17 +%%% Created : 30. 4月 2025 09:22 %%%------------------------------------------------------------------- -module(efka_tcp_channel). --author("licheng5"). --behaviour(gen_server). +-author("anlicheng"). -%% 心跳包监测机制 --define(PING_TICKER, 15000). +-behaviour(gen_server). %% API -export([start_link/1]). --export([publish_command/4, send_event/3, stop/2, move_network/3]). +-export([push_metric/2, push_param/2]). +%% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-define(SERVER, ?MODULE). + +%% 消息类型 + +%% 服务注册 +-define(PACKET_REGISTER, 16). +%% 上传数据 +-define(PACKET_METRIC_DATA, 3). +%% 消息响应 +-define(PACKET_RESPONSE, 7). +%% efka下发给微服务参数 +-define(PACKET_PUSH_PARAM, 5). +%% efka下发给微服务采集项 +-define(PACKET_PUSH_METRIC, 6). + +%% 微服务给efka发送log消息 +-define(PACKET_LOG, 9). + +%% 微服务从efka获取自身的采集项 +-define(PACKET_REQUEST_METRIC, 10). +%% 微服务从efka获取自身的参数 +-define(PACKET_REQUEST_PARAM, 12). +%% 微服务事件上报 +-define(PACKET_EVENT, 15). +-define(PACKET_AI_EVENT, 16). + -record(state, { - socket, - %% 标记是否已经注册 - is_registered = false, - %% 记录ping的次数 - ping_counter = 0, - %% 发送消息对应的id - packet_id = 1 :: integer(), - %% 请求响应的对应关系 + packet_id = 1, + socket :: gen_tcp:socket(), + service_id :: undefined | binary(), + service_pid :: undefined | pid(), + is_registered = false :: boolean(), + + %% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}} inflight = #{} }). -%% 向通道中写入消息 --spec publish_command(Pid :: pid(), ReceiverPid :: pid(), CommandType :: integer(), Msg :: binary()) -> Ref :: reference(). -publish_command(Pid, ReceiverPid, CommandType, Msg) when is_pid(Pid), is_pid(ReceiverPid), is_integer(CommandType), is_binary(Msg) -> +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec push_param(ChannelPid :: pid(), Params :: binary()) -> {ok, Ref :: reference()}. +push_param(ChannelPid, Params) when is_pid(ChannelPid), is_binary(Params) -> Ref = make_ref(), - Pid ! {publish_command, ReceiverPid, Ref, CommandType, Msg}, - Ref. + gen_server:cast(ChannelPid, {push_param, Ref, self(), Params}), + {ok, Ref}. -%% 网络迁移是一种特殊的指令信息,需要单独处理 --spec move_network(Pid :: pid(), ReceiverPid :: pid(), NetworkPid :: pid()) -> Ref :: reference(). -move_network(Pid, ReceiverPid, NetworkPid) when is_pid(Pid), is_pid(ReceiverPid), is_pid(NetworkPid) -> +-spec push_metric(ChannelPid :: pid(), Metrics :: binary()) -> {ok, Ref :: reference()}. +push_metric(ChannelPid, Metrics) when is_pid(ChannelPid), is_binary(Metrics) -> Ref = make_ref(), - Pid ! {move_network, ReceiverPid, Ref, NetworkPid}, - Ref. - -%% 向通道中写入消息 --spec send_event(Pid :: pid(), EventType :: integer(), Event :: binary()) -> no_return(). -send_event(Pid, EventType, Event) when is_pid(Pid), is_integer(EventType), is_binary(Event) -> - Pid ! {send_event, EventType, Event}. - -%% 关闭方法 --spec stop(Pid :: pid(), Reason :: any()) -> no_return(). -stop(undefined, _Reason) -> - ok; -stop(Pid, Reason) when is_pid(Pid) -> - Pid ! {stop, Reason}. - -%%-------------------------------------------------------------------- -%% esockd callback -%%-------------------------------------------------------------------- + gen_server:cast(ChannelPid, {push_metric, Ref, self(), Metrics}), + {ok, Ref}. +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Socket :: gen_tcp:socket()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link(Socket) -> gen_server:start_link(?MODULE, [Socket], []). -init([Socket]) -> - efka_logger:debug("[sdlan_channel] get a new connection: ~p", [Socket]), - gen_tcp:controlling_process(Socket, self()), - ok = inet:setopts(Socket, [binary, {active, true}, {packet, 4}]), - % erlang:start_timer(?PING_TICKER, self(), ping_ticker), - gen_server:enter_loop(?MODULE, [], #state{socket = Socket}). +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== -handle_call(_Request, _From, State) -> +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([Socket]) -> + ok = inet:setopts(Socket, [{active, true}]), + lager:debug("[tcp_channel] get new socket: ~p", [Socket]), + {ok, #state{socket = Socket}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. -handle_cast(_Msg, State) -> +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 推送参数项目 +handle_cast({push_param, Ref, ReceiverPid, Params}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + +%% 推送采集项目 +handle_cast({push_metric, Ref, ReceiverPid, Metrics}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + +handle_cast(_Request, State = #state{}) -> {noreply, State}. -%% 网络流量统计 -handle_info({tcp, _Sock, Body}, State = #state{}) -> - efka_logger:debug("[sdlan_channel] read body: ~p", [Body]), +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 注册 +handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> + case efka_micro_service:get_pid(ServiceId) of + undefined -> + ok = gen_tcp:send(Socket, <>); + Pid when is_pid(Pid) -> + case efka_micro_service:attach_channel(Pid, self()) of + ok -> + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}}; + {error, Reason} -> + ok = gen_tcp:send(Socket, <>), + {stop, normal, State} + end + end; + +%% 请求参数 +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + {ok, Args} = efka_micro_service:request_arguments(ServicePid), + ok = gen_tcp:send(Socket, <>), {noreply, State}; -handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> - efka_logger:notice("[sdlan_channel] tcp_error: ~p", [Reason]), - {stop, normal, State}; +%% 请求采集项目 +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + {ok, Metrics} = efka_micro_service:request_metrics(ServicePid), + ok = gen_tcp:send(Socket, <>), + {noreply, State}; -handle_info({tcp_closed, Sock}, State = #state{socket = Sock}) -> - efka_logger:notice("[sdlan_channel] tcp_closed", []), - {stop, normal, State}; +%% 数据项 +handle_info({tcp, Socket, <<0:32, ?PACKET_METRIC_DATA:8, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + case catch jiffy:decode(Data, [return_maps]) of + #{<<"fields">> := Fields, <<"tag">> := Tags} -> + efka_micro_service:metric_data(ServicePid, Fields, Tags); + _ -> + ok + end, + {noreply, State}; -%% 关闭当前通道 -handle_info({stop, Reason}, State) -> - {stop, Reason, State}; +%% 远程日志 +handle_info({tcp, Socket, <<0:32, ?PACKET_LOG:8, Log/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + efka_micro_service:send_log(ServicePid, Log), + {noreply, State}; -handle_info(Info, State) -> - efka_logger:debug("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]), +%% Event事件 +handle_info({tcp, Socket, <<0:32, ?PACKET_EVENT:8, EventData/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + case catch jiffy:decode(EventData, [return_maps]) of + #{<<"event_type">> := EventType, <<"params">> := Params} -> + efka_micro_service:send_event(ServicePid, EventType, Params); + _ -> + ok + end, + {noreply, State}; + +%% AIEvent事件 +handle_info({tcp, Socket, <<0:32, ?PACKET_AI_EVENT:8, EventData/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + case catch jiffy:decode(EventData, [return_maps]) of + #{<<"event_type">> := EventType, <<"params">> := Params} -> + efka_micro_service:send_ai_event(ServicePid, EventType, Params); + _ -> + ok + end, + {noreply, State}; + +%% 收到端上的响应 +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> + case maps:take(PacketId, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Response, PacketId]), + {noreply, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + case Response of + <<1:8, Result/binary>> -> + ReceiverPid ! {channel_reply, Ref, {ok, Result}}; + <<0:8, Error/binary>> -> + ReceiverPid ! {channel_reply, Ref, {error, Error}} + end; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Response, PacketId]) + end, + {noreply, State#state{inflight = NInflight}} + end; + +handle_info(Info, State = #state{}) -> + lager:debug("[tcp_channel] get info: ~p", [Info]), {noreply, State}. -terminate(Reason, #state{}) -> - efka_logger:notice("[sdlan_channel] stop with reason: ~p", [Reason]), +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> ok. -code_change(_OldVsn, State, _Extra) -> +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% helper methods -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/efka/src/tcp_channel.erl b/apps/efka/src/tcp_channel.erl deleted file mode 100644 index abe6362..0000000 --- a/apps/efka/src/tcp_channel.erl +++ /dev/null @@ -1,203 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 30. 4月 2025 09:22 -%%%------------------------------------------------------------------- --module(tcp_channel). --author("anlicheng"). - --behaviour(gen_server). - -%% API --export([start_link/1]). --export([push_metric/2, push_param/2, poll/2]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - -%% 消息类型 - -%% 服务注册 --define(PACKET_REGISTER, 16). -%% 上传数据 --define(PACKET_TYPE_METRIC_DATA, 3). -%% 消息响应 --define(PACKET_TYPE_RESPONSE, 7). -%% efka下发给微服务参数 --define(PACKET_TYPE_PUSH_PARAM, 5). -%% efka下发给微服务采集项 --define(PACKET_TYPE_PUSH_METRIC, 6). -%% 设备状态轮询: 增加日期: 2025-4-16 --define(PACKET_TYPE_POLL, 20). -%% 微服务给efka发送log消息 --define(PACKET_TYPE_LOG, 9). -%% 微服务从efka获取自身的采集项 --define(PACKET_TYPE_REQUEST_METRIC, 10). -%% 微服务从efka获取自身的参数 --define(PACKET_TYPE_REQUEST_PARAM, 12). -%% 微服务事件上报 --define(PACKET_TYPE_EVENT, 15). - --record(state, { - packet_id = 1, - socket :: gen_tcp:socket(), - micro_service_id :: undefined | binary(), - is_registered = false :: boolean(), - - %% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}} - inflight = #{} -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec push_param(ChannelPid :: pid(), Params :: binary()) -> {ok, Ref :: reference()}. -push_param(ChannelPid, Params) when is_pid(ChannelPid), is_binary(Params) -> - Ref = make_ref(), - gen_server:cast(ChannelPid, {push_param, Ref, self(), Params}), - {ok, Ref}. - --spec push_metric(ChannelPid :: pid(), Metrics :: binary()) -> {ok, Ref :: reference()}. -push_metric(ChannelPid, Metrics) when is_pid(ChannelPid), is_binary(Metrics) -> - Ref = make_ref(), - gen_server:cast(ChannelPid, {push_metric, Ref, self(), Metrics}), - {ok, Ref}. - --spec poll(ChannelPid :: pid(), Command :: binary()) -> {ok, Ref :: reference()}. -poll(ChannelPid, Command) when is_pid(ChannelPid), is_binary(Command) -> - Ref = make_ref(), - gen_server:cast(ChannelPid, {poll, Ref, self(), Command}), - {ok, Ref}. - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(Socket :: gen_tcp:socket()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Socket) -> - gen_server:start_link(?MODULE, [Socket], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([Socket]) -> - ok = inet:setopts(Socket, [{active, true}]), - lager:debug("[tcp_channel] get new socket: ~p", [Socket]), - {ok, #state{socket = Socket}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% 推送参数项目 -handle_cast({push_param, Ref, ReceiverPid, Params}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - ok = gen_tcp:send(Socket, <>), - {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; -%% 推送采集项目 -handle_cast({push_metric, Ref, ReceiverPid, Metrics}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - ok = gen_tcp:send(Socket, <>), - {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; -%% 推送poll请求 -handle_cast({poll, Ref, ReceiverPid, Command}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - ok = gen_tcp:send(Socket, <>), - {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; - -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% 注册 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> - ok = gen_tcp:send(Socket, <>), - {noreply, State#state{micro_service_id = MicroServiceId, is_registered = true}}; - -handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> - lager:debug("[tcp_channel] get metric data: ~p", [Body]), - - ok = gen_tcp:send(Socket, <>), - {noreply, State}; - -handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> - ok = gen_tcp:send(Socket, <>), - {noreply, State}; - -handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> - ok = gen_tcp:send(Socket, <>), - {noreply, State}; - -%% 收到端上的响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> - case maps:take(PacketId, Inflight) of - error -> - lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Response, PacketId]), - {noreply, State}; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - case Response of - <<1:8, Result/binary>> -> - ReceiverPid ! {channel_reply, Ref, {ok, Result}}; - <<0:8, Error/binary>> -> - ReceiverPid ! {channel_reply, Ref, {error, Error}} - end; - false -> - lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Response, PacketId]) - end, - {noreply, State#state{inflight = NInflight}} - end; - -handle_info(Info, State = #state{}) -> - lager:debug("[tcp_channel] get info: ~p", [Info]), - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%===================================================================