From e59a40e142043e7f0aed0690f84d8590b412ffce Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 16 Sep 2025 18:46:28 +0800 Subject: [PATCH] remove tcp channel --- apps/efka/src/efka_sup.erl | 9 - apps/efka/src/gen_channel.erl | 90 -------- apps/efka/src/tcp_server/tcp_channel.erl | 201 ------------------ .../src/tcp_server/tcp_connection_sup.erl | 43 ---- apps/efka/src/tcp_server/tcp_server.erl | 46 ---- apps/efka/src/tcp_server/tcp_sup.erl | 72 ------- apps/efka/src/websocket_server/ws_channel.erl | 73 ++++++- 7 files changed, 72 insertions(+), 462 deletions(-) delete mode 100644 apps/efka/src/gen_channel.erl delete mode 100644 apps/efka/src/tcp_server/tcp_channel.erl delete mode 100644 apps/efka/src/tcp_server/tcp_connection_sup.erl delete mode 100644 apps/efka/src/tcp_server/tcp_server.erl delete mode 100644 apps/efka/src/tcp_server/tcp_sup.erl diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index 6413735..5ae13c2 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -82,15 +82,6 @@ init([]) -> % modules => ['efka_remote_agent'] %}, - #{ - id => 'tcp_sup', - start => {'tcp_sup', start_link, []}, - restart => permanent, - shutdown => 2000, - type => supervisor, - modules => ['tcp_sup'] - }, - #{ id => 'efka_service_sup', start => {'efka_service_sup', start_link, []}, diff --git a/apps/efka/src/gen_channel.erl b/apps/efka/src/gen_channel.erl deleted file mode 100644 index 91adb55..0000000 --- a/apps/efka/src/gen_channel.erl +++ /dev/null @@ -1,90 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 27. 8月 2025 15:22 -%%%------------------------------------------------------------------- --module(gen_channel). --author("anlicheng"). --include("efka_service.hrl"). - --export([register/2]). --export([push_config/4, invoke/4, channel_reply/3]). --export([next_packet_id/1]). --export([json_result/2, json_error/3]). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec push_config(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), ConfigJson :: binary()) -> no_return(). -push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) -> - ChannelPid ! {push_config, Ref, ReceiverPid, ConfigJson}. - --spec invoke(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Payload :: binary()) -> no_return(). -invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> - ChannelPid ! {invoke, Ref, ReceiverPid, Payload}. - -%% 超时逻辑处理 -channel_reply(Id, Reply, Inflight) -> - case maps:take(Id, Inflight) of - error -> - Inflight; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {channel_reply, Ref, Reply}; - false -> - ok - end, - NInflight - end. - -%% 注册 --spec register(Id :: integer(), ServiceId :: binary()) -> {error, Reply :: binary()} | {ok, Reply :: binary(), ServicePid :: pid()}. -register(Id, ServiceId) when is_integer(Id), is_binary(ServiceId) -> - case efka_service:get_pid(ServiceId) of - undefined -> - lager:warning("[gen_channel] service_id: ~p, not running", [ServiceId]), - Reply = json_error(Id, -1, <<"service not running">>), - {error, Reply}; - ServicePid when is_pid(ServicePid) -> - case efka_service:attach_channel(ServicePid, self()) of - ok -> - Reply = json_result(Id, <<"ok">>), - erlang:monitor(process, ServicePid), - {ok, Reply, ServicePid}; - {error, Error} -> - lager:warning("[gen_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), - Reply = json_error(Id, -1, Error), - {error, Reply} - end - end. - -%% 采用32位编码 --spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). -next_packet_id(PacketId) when PacketId >= 4294967295 -> - 1; -next_packet_id(PacketId) -> - PacketId + 1. - --spec json_result(Id :: integer(), Result :: term()) -> binary(). -json_result(Id, Result) when is_integer(Id) -> - Response = #{ - <<"id">> => Id, - <<"result">> => Result - }, - jiffy:encode(Response, [force_utf8]). - --spec json_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). -json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> - Response = #{ - <<"id">> => Id, - <<"error">> => #{ - <<"code">> => Code, - <<"message">> => Message - } - }, - jiffy:encode(Response, [force_utf8]). \ No newline at end of file diff --git a/apps/efka/src/tcp_server/tcp_channel.erl b/apps/efka/src/tcp_server/tcp_channel.erl deleted file mode 100644 index 519d949..0000000 --- a/apps/efka/src/tcp_server/tcp_channel.erl +++ /dev/null @@ -1,201 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 30. 4月 2025 09:22 -%%%------------------------------------------------------------------- --module(tcp_channel). --author("anlicheng"). --include("efka_service.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - -%% 最大的等待时间 --define(PENDING_TIMEOUT, 10 * 1000). -%% 消息类型 - --record(state, { - packet_id = 1, - socket :: gen_tcp:socket(), - service_id :: undefined | binary(), - service_pid :: undefined | pid(), - is_registered = false :: boolean(), - - %% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}}; 自身的inflight需要超时逻辑处理 - inflight = #{} -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(Socket :: gen_tcp:socket()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Socket) -> - gen_server:start_link(?MODULE, [Socket], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([Socket]) -> - ok = inet:setopts(Socket, [{active, true}]), - lager:debug("[efka_tcp_channel] get micro service socket: ~p", [Socket]), - {ok, #state{socket = Socket}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). - -%% 推送配置项目 -handle_info({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson}}, - Packet = jiffy:encode(PushConfig, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - - erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), - {noreply, State#state{packet_id = gen_channel:next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; - -%% 远程调用 -handle_info({invoke, Ref, ReceiverPid, Payload}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload}}, - Packet = jiffy:encode(PushConfig, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - - erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), - {noreply, State#state{packet_id = gen_channel:next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; - -%% 处理micro-client:request => efka 主动的请求 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> - Request = jiffy:decode(Data, [return_maps]), - handle_request(Request, State); - -%% 处理micro-client:response => efka 的响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> - Resp = jiffy:decode(Data, [return_maps]), - {PacketId, Reply} = case Resp of - #{<<"id">> := Id, <<"result">> := Result} -> - {Id, {ok, Result}}; - #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> - {Id, {error, Error}} - end, - - NInflight = gen_channel:channel_reply(PacketId, Reply, Inflight), - {noreply, State#state{inflight = NInflight}}; - -%% 超时逻辑处理 -handle_info({timeout, _, {pending_timeout, Id}}, State = #state{inflight = Inflight}) -> - NInflight = gen_channel:channel_reply(Id, {error, <<"timeout">>}, Inflight), - {noreply, State#state{inflight = NInflight}}; - -%% 订阅的消息 -handle_info({topic_broadcast, Topic, Content}, State = #state{socket = Socket}) -> - Packet = jiffy:encode(#{<<"topic">> => Topic, <<"content">> => Content}, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - - {noreply, State}; - -%% service进程关闭 -handle_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{service_pid = ServicePid}) -> - lager:debug("[tcp_channel] service_pid: ~p, exited: ~p", [ServicePid, Reason]), - {stop, normal, State#state{service_pid = undefined}}; - -handle_info({tcp_error, Socket, Reason}, State = #state{socket = Socket, service_id = ServiceId}) -> - lager:debug("[tcp_channel] tcp_error: ~p, assoc service: ~p", [Reason, ServiceId]), - {stop, normal, State}; -handle_info({tcp_closed, Socket}, State = #state{socket = Socket, service_id = ServiceId}) -> - lager:debug("[tcp_channel] tcp_closed: ~p, assoc service: ~p", [Socket, ServiceId]), - {stop, normal, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% 注册 -handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := #{<<"service_id">> := ServiceId}}, State = #state{socket = Socket}) -> - case gen_channel:register(Id, ServiceId) of - {error, Reply} -> - ok = gen_tcp:send(Socket, <>), - {stop, normal, State}; - {ok, Reply, ServicePid} -> - ok = gen_tcp:send(Socket, <>), - {noreply, State#state{service_id = ServiceId, service_pid = ServicePid, is_registered = true}} - end; -%% 请求参数 -handle_request(#{<<"id">> := Id, <<"method">> := <<"request_config">>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - {ok, ConfigJson} = efka_service:request_config(ServicePid), - Packet = gen_channel:json_result(Id, ConfigJson), - ok = gen_tcp:send(Socket, <>), - {noreply, State}; -%% 数据项 -handle_request(#{<<"id">> := 0, <<"method">> := <<"metric_data">>, - <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> - efka_service:metric_data(ServicePid, DeviceUUID, RouteKey, Metric), - {noreply, State}; -%% Event事件 -handle_request(#{<<"id">> := 0, <<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) -> - efka_service:send_event(ServicePid, EventType, Body), - {noreply, State}; -%% 订阅事件 -handle_request(#{<<"id">> := 0, <<"method">> := <<"subscribe">>, <<"params">> := #{<<"topic">> := Topic}}, State = #state{is_registered = true}) -> - efka_subscription:subscribe(Topic, self()), - {noreply, State}. \ No newline at end of file diff --git a/apps/efka/src/tcp_server/tcp_connection_sup.erl b/apps/efka/src/tcp_server/tcp_connection_sup.erl deleted file mode 100644 index ccfeadf..0000000 --- a/apps/efka/src/tcp_server/tcp_connection_sup.erl +++ /dev/null @@ -1,43 +0,0 @@ -%%%------------------------------------------------------------------- -%% @doc efka top level supervisor. -%% @end -%%%------------------------------------------------------------------- - --module(tcp_connection_sup). - --behaviour(supervisor). - --export([start_link/0, start_child/1]). - --export([init/1]). - --define(SERVER, ?MODULE). - -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). - -%% sup_flags() = #{strategy => strategy(), % optional -%% intensity => non_neg_integer(), % optional -%% period => pos_integer()} % optional -%% child_spec() = #{id => child_id(), % mandatory -%% start => mfargs(), % mandatory -%% restart => restart(), % optional -%% shutdown => shutdown(), % optional -%% type => worker(), % optional -%% modules => modules()} % optional -init([]) -> - SupFlags = #{strategy => simple_one_for_one, intensity => 0, period => 1}, - ChildSpec = #{ - id => tcp_channel, - start => {tcp_channel, start_link, []}, - restart => temporary, - type => worker - }, - {ok, {SupFlags, [ChildSpec]}}. - -%% internal functions - -start_child(Socket) -> - supervisor:start_child(?MODULE, [Socket]). - - diff --git a/apps/efka/src/tcp_server/tcp_server.erl b/apps/efka/src/tcp_server/tcp_server.erl deleted file mode 100644 index 5a9de03..0000000 --- a/apps/efka/src/tcp_server/tcp_server.erl +++ /dev/null @@ -1,46 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 29. 4月 2025 23:24 -%%%------------------------------------------------------------------- --module(tcp_server). --author("anlicheng"). - -%% API --export([start_link/0, init/0]). - -start_link() -> - {ok, spawn_link(?MODULE, init, [])}. - -%% 监听循环 -init() -> - {ok, TcpServerProps} = application:get_env(efka, tcp_server), - Port = proplists:get_value(port, TcpServerProps), - - case gen_tcp:listen(Port, [binary, {packet, 4}, {active, false}, {reuseaddr, true}]) of - {ok, ListenSocket} -> - lager:debug("[efka_tcp_server] Server started on port ~p~n", [Port]), - main_loop(ListenSocket); - {error, Reason} -> - lager:debug("[efka_tcp_server] Failed to start server: ~p~n", [Reason]), - exit(Reason) - end. - -main_loop(ListenSocket) -> - case gen_tcp:accept(ListenSocket) of - {ok, Socket} -> - % 为每个新连接生成一个处理进程 - {ok, ChannelPid} = tcp_connection_sup:start_child(Socket), - ok = gen_tcp:controlling_process(Socket, ChannelPid), - % 继续监听下一个连接 - main_loop(ListenSocket); - {error, closed} -> - lager:debug("[efka_tcp_server] Server socket closed"), - exit(tcp_closed); - {error, Reason} -> - lager:debug("[efka_tcp_server] Accept error: ~p", [Reason]), - exit(Reason) - end. \ No newline at end of file diff --git a/apps/efka/src/tcp_server/tcp_sup.erl b/apps/efka/src/tcp_server/tcp_sup.erl deleted file mode 100644 index 159e591..0000000 --- a/apps/efka/src/tcp_server/tcp_sup.erl +++ /dev/null @@ -1,72 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 26. 8月 2025 14:36 -%%%------------------------------------------------------------------- --module(tcp_sup). --author("anlicheng"). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - --define(SERVER, ?MODULE). - -%%%=================================================================== -%%% API functions -%%%=================================================================== - -%% @doc Starts the supervisor --spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). - -%%%=================================================================== -%%% Supervisor callbacks -%%%=================================================================== - -%% @private -%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3], -%% this function is called by the new process to find out about -%% restart strategy, maximum restart frequency and child -%% specifications. --spec(init(Args :: term()) -> - {ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(), - MaxR :: non_neg_integer(), MaxT :: non_neg_integer()}, - [ChildSpec :: supervisor:child_spec()]}} - | ignore | {error, Reason :: term()}). -init([]) -> - SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - - Specs = [ - #{ - id => 'tcp_connection_sup', - start => {'tcp_connection_sup', start_link, []}, - restart => permanent, - shutdown => 2000, - type => supervisor, - modules => ['tcp_connection_sup'] - }, - - #{ - id => 'tcp_server', - start => {'tcp_server', start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['tcp_server'] - } - ], - - {ok, {SupFlags, Specs}}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/apps/efka/src/websocket_server/ws_channel.erl b/apps/efka/src/websocket_server/ws_channel.erl index 5b32a4e..0dd9393 100644 --- a/apps/efka/src/websocket_server/ws_channel.erl +++ b/apps/efka/src/websocket_server/ws_channel.erl @@ -13,6 +13,7 @@ %% API -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). +-export([push_config/4, invoke/4, channel_reply/3, register/2]). %% 最大的等待时间 -define(PENDING_TIMEOUT, 10 * 1000). @@ -27,6 +28,50 @@ inflight = #{} }). +-spec push_config(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), ConfigJson :: binary()) -> no_return(). +push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) -> + ChannelPid ! {push_config, Ref, ReceiverPid, ConfigJson}. + +-spec invoke(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Payload :: binary()) -> no_return(). +invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> + ChannelPid ! {invoke, Ref, ReceiverPid, Payload}. + +%% 超时逻辑处理 +channel_reply(Id, Reply, Inflight) -> + case maps:take(Id, Inflight) of + error -> + Inflight; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, Reply}; + false -> + ok + end, + NInflight + end. + +%% 注册 +-spec register(Id :: integer(), ServiceId :: binary()) -> {error, Reply :: binary()} | {ok, Reply :: binary(), ServicePid :: pid()}. +register(Id, ServiceId) when is_integer(Id), is_binary(ServiceId) -> + case efka_service:get_pid(ServiceId) of + undefined -> + lager:warning("[gen_channel] service_id: ~p, not running", [ServiceId]), + Reply = json_error(Id, -1, <<"service not running">>), + {error, Reply}; + ServicePid when is_pid(ServicePid) -> + case efka_service:attach_channel(ServicePid, self()) of + ok -> + Reply = json_result(Id, <<"ok">>), + erlang:monitor(process, ServicePid), + {ok, Reply, ServicePid}; + {error, Error} -> + lager:warning("[gen_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), + Reply = json_error(Id, -1, Error), + {error, Reply} + end + end. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 逻辑处理方法 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -147,4 +192,30 @@ handle_request(#{<<"id">> := 0, <<"method">> := <<"subscribe">>, <<"params">> := {ok, State}. delay_stop(Timeout, Reason) when is_integer(Timeout) -> - erlang:start_timer(Timeout, self(), {stop, Reason}). \ No newline at end of file + erlang:start_timer(Timeout, self(), {stop, Reason}). + +%% 采用32位编码 +-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). +next_packet_id(PacketId) when PacketId >= 4294967295 -> + 1; +next_packet_id(PacketId) -> + PacketId + 1. + +-spec json_result(Id :: integer(), Result :: term()) -> binary(). +json_result(Id, Result) when is_integer(Id) -> + Response = #{ + <<"id">> => Id, + <<"result">> => Result + }, + jiffy:encode(Response, [force_utf8]). + +-spec json_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). +json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> + Response = #{ + <<"id">> => Id, + <<"error">> => #{ + <<"code">> => Code, + <<"message">> => Message + } + }, + jiffy:encode(Response, [force_utf8]). \ No newline at end of file