This commit is contained in:
anlicheng 2025-09-18 15:02:31 +08:00
parent 63562f2e6a
commit fb0ad40a44
7 changed files with 96 additions and 120 deletions

View File

@ -15,32 +15,21 @@
%% efka主动发起不需要返回的数据 %% efka主动发起不需要返回的数据
-define(PACKET_CAST, 16#03). -define(PACKET_CAST, 16#03).
%% pub/sub的消息,
-define(PACKET_PUB, 16#04).
%% push调用不需要返回,
-define(PACKET_COMMAND, 16#05).
%% RPC调用
%% Service通讯,
-define(PACKET_RPC, 16#10).
-define(PACKET_RPC_REPLY, 16#11).
%% ping包
-define(PACKET_PING, 16#FF).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%% %%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% %%
-define(MESSAGE_AUTH_REQUEST, 16#01). -define(MESSAGE_AUTH_REQUEST, 16#01).
-define(MESSAGE_AUTH_REPLY, 16#02). -define(MESSAGE_PUB, 16#02).
-define(MESSAGE_PUB, 16#03). -define(MESSAGE_COMMAND, 16#03).
-define(MESSAGE_COMMAND, 16#04). -define(MESSAGE_RPC_DEPLOY, 16#04).
-define(MESSAGE_RPC_DEPLOY, 16#05). -define(MESSAGE_RPC_CONTAINER, 16#05).
-define(MESSAGE_RPC_CONTAINER, 16#06). -define(MESSAGE_DATA, 16#06).
-define(MESSAGE_DATA, 16#07). -define(MESSAGE_EVENT, 16#07).
-define(MESSAGE_EVENT, 16#08).
%%
-define(MESSAGE_RPC_REPLY, 16#FF).
%%%% , %%%% ,
%% %%
@ -54,11 +43,6 @@
timestamp :: integer() timestamp :: integer()
}). }).
-record(auth_reply, {
code :: integer(),
message :: binary()
}).
-record(pub, { -record(pub, {
topic :: binary(), topic :: binary(),
content :: binary() content :: binary()
@ -80,6 +64,11 @@
params = <<>> :: binary() params = <<>> :: binary()
}). }).
-record(rpc_reply, {
code :: integer(),
payload :: binary()
}).
-record(data, { -record(data, {
service_id :: binary(), service_id :: binary(),
device_uuid :: binary(), device_uuid :: binary(),

View File

@ -6,7 +6,7 @@
%%% @end %%% @end
%%% Created : 26. 4 2020 3:36 %%% Created : 26. 4 2020 3:36
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(service_handler). -module(container_handler).
-author("licheng5"). -author("licheng5").
-include("iot.hrl"). -include("iot.hrl").
@ -14,7 +14,7 @@
-export([handle_request/4]). -export([handle_request/4]).
%% config.json, %% config.json,
handle_request("POST", "/service/push_config", _, handle_request("POST", "/container/push_config", _,
#{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0})
when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) -> when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) ->
@ -39,7 +39,7 @@ handle_request("POST", "/service/push_config", _,
end; end;
%% %%
handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) handle_request("POST", "/container/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl})
when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) ->
case iot_host:get_pid(UUID) of case iot_host:get_pid(UUID) of
@ -60,7 +60,7 @@ handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">>
end; end;
%% %%
handle_request("POST", "/service/start", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> handle_request("POST", "/container/start", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) ->
case iot_host:get_pid(UUID) of case iot_host:get_pid(UUID) of
undefined -> undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)}; {ok, 200, iot_util:json_error(404, <<"host not found">>)};
@ -79,7 +79,7 @@ handle_request("POST", "/service/start", _, #{<<"uuid">> := UUID, <<"service_id"
end; end;
%% %%
handle_request("POST", "/service/stop", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> handle_request("POST", "/container/stop", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) ->
case iot_host:get_pid(UUID) of case iot_host:get_pid(UUID) of
undefined -> undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)}; {ok, 200, iot_util:json_error(404, <<"host not found">>)};
@ -119,7 +119,7 @@ handle_request("POST", "/service/invoke", _, #{<<"uuid">> := UUID, <<"service_id
end end
end; end;
handle_request("POST", "/service/task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) -> handle_request("POST", "/container/task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) ->
case iot_host:get_pid(UUID) of case iot_host:get_pid(UUID) of
undefined -> undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)}; {ok, 200, iot_util:json_error(404, <<"host not found">>)};

View File

@ -6,39 +6,12 @@
%%% @end %%% @end
%%% Created : 08. 5 2025 13:00 %%% Created : 08. 5 2025 13:00
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(http_server). -module(http_protocol).
-author("anlicheng"). -author("anlicheng").
%% API %% API
-export([start/0]).
-export([init/2]). -export([init/2]).
%% http服务
start() ->
{ok, Props} = application:get_env(iot, http_server),
Acceptors = proplists:get_value(acceptors, Props, 50),
MaxConnections = proplists:get_value(max_connections, Props, 10240),
Backlog = proplists:get_value(backlog, Props, 1024),
Port = proplists:get_value(port, Props),
Dispatcher = cowboy_router:compile([
{'_', [
{"/host/[...]", ?MODULE, [host_handler]},
{"/service/[...]", ?MODULE, [service_handler]},
{"/device/[...]", ?MODULE, [device_handler]}
]}
]),
TransOpts = [
{port, Port},
{num_acceptors, Acceptors},
{backlog, Backlog},
{max_connections, MaxConnections}
],
{ok, Pid} = cowboy:start_clear(http_listener, TransOpts, #{env => #{dispatch => Dispatcher}}),
lager:debug("[http_server] the http server start at: ~p, pid is: ~p", [Port, Pid]).
init(Req0, Opts = [Mod|_]) -> init(Req0, Opts = [Mod|_]) ->
Method = binary_to_list(cowboy_req:method(Req0)), Method = binary_to_list(cowboy_req:method(Req0)),
Path = binary_to_list(cowboy_req:path(Req0)), Path = binary_to_list(cowboy_req:path(Req0)),

View File

@ -16,7 +16,7 @@ start(_StartType, _StartArgs) ->
start_mnesia(), start_mnesia(),
%% http服务 %% http服务
http_server:start(), start_http_server(),
%% tcp服务 %% tcp服务
tcp_server:start(), tcp_server:start(),
@ -38,6 +38,31 @@ start_mnesia() ->
%% %%
ok. ok.
start_http_server() ->
{ok, Props} = application:get_env(iot, http_server),
Acceptors = proplists:get_value(acceptors, Props, 50),
MaxConnections = proplists:get_value(max_connections, Props, 10240),
Backlog = proplists:get_value(backlog, Props, 1024),
Port = proplists:get_value(port, Props),
Dispatcher = cowboy_router:compile([
{'_', [
{"/host/[...]", http_protocol, [host_handler]},
{"/container/[...]", http_protocol, [container_handler]},
{"/device/[...]", http_protocol, [device_handler]}
]}
]),
TransOpts = [
{port, Port},
{num_acceptors, Acceptors},
{backlog, Backlog},
{max_connections, MaxConnections}
],
{ok, Pid} = cowboy:start_clear(http_listener, TransOpts, #{env => #{dispatch => Dispatcher}}),
lager:debug("[http_server] the http server start at: ~p, pid is: ~p", [Port, Pid]).
-spec ensure_mnesia_schema() -> any(). -spec ensure_mnesia_schema() -> any().
ensure_mnesia_schema() -> ensure_mnesia_schema() ->
case mnesia:system_info(use_dir) of case mnesia:system_info(use_dir) of

View File

@ -9,7 +9,7 @@
-module(iot_host). -module(iot_host).
-author("aresei"). -author("aresei").
-include("iot.hrl"). -include("iot.hrl").
-include("message_pb.hrl"). -include("message.hrl").
-behaviour(gen_statem). -behaviour(gen_statem).
@ -25,7 +25,7 @@
-export([get_metric/1, get_status/1]). -export([get_metric/1, get_status/1]).
%% %%
-export([pub/3, attach_channel/2, command/3]). -export([pub/3, attach_channel/2, command/3]).
-export([deploy_service/4, start_service/2, stop_service/2, invoke_service/4, async_service_config/4, task_log/2, await_reply/2]). -export([deploy_container/3, start_container/2, stop_container/2, config_container/3, await_reply/2]).
%% %%
-export([reload_device/2, delete_device/2, activate_device/3]). -export([reload_device/2, delete_device/2, activate_device/3]).
-export([heartbeat/1]). -export([heartbeat/1]).
@ -89,40 +89,37 @@ get_metric(Pid) when is_pid(Pid) ->
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
gen_statem:call(Pid, {attach_channel, ChannelPid}). gen_statem:call(Pid, {attach_channel, ChannelPid}).
-spec async_service_config(Pid :: pid(), ServiceId :: binary(), ConfigJson :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -spec config_container(Pid :: pid(), ContainerName :: binary(), ConfigJson :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
async_service_config(Pid, ServiceId, ConfigJson, Timeout) when is_pid(Pid), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout) -> config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(ContainerName), is_binary(ConfigJson) ->
ConfigBin = message_pb:encode_msg(#push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout}), EncConfigBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"config">>, container_name = ContainerName, params = ConfigJson}),
gen_statem:call(Pid, {async_call, self(), ?PUSH_SERVICE_CONFIG, ConfigBin}). gen_statem:call(Pid, {rpc_call, self(), EncConfigBin}).
-spec deploy_service(Pid :: pid(), TaskId :: integer(), ServiceId :: binary(), TarUrl :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
deploy_service(Pid, TaskId, ServiceId, TarUrl) when is_pid(Pid), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_binary(Config) ->
PushBin = message_pb:encode_msg(#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl}), EncDeployBin = message_codec:encode(?MESSAGE_RPC_DEPLOY, #rpc_deploy{task_id = TaskId, config = Config}),
gen_statem:call(Pid, {async_call, self(), ?PUSH_DEPLOY, PushBin}). gen_statem:call(Pid, {rpc_call, self(), ?PUSH_DEPLOY, EncDeployBin}).
-spec start_service(Pid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
start_service(Pid, ServiceId) when is_pid(Pid), is_binary(ServiceId) -> start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
gen_statem:call(Pid, {async_call, self(), ?PUSH_START_SERVICE, ServiceId}). EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"start">>, container_name = ContainerName}),
gen_statem:call(Pid, {rpc_call, self(), EncCallBin}).
-spec stop_service(Pid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -spec stop_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
stop_service(Pid, ServiceId) when is_pid(Pid), is_binary(ServiceId) -> stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
gen_statem:call(Pid, {async_call, self(), ?PUSH_STOP_SERVICE, ServiceId}). EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"stop">>, container_name = ContainerName}),
gen_statem:call(Pid, {rpc_call, self(), EncCallBin}).
-spec invoke_service(Pid :: pid(), ServiceId :: binary(), Payload :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. %-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
invoke_service(Pid, ServiceId, Payload, Timeout) when is_pid(Pid), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout) -> %task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) ->
InvokeBin = message_pb:encode_msg(#invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}), % TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}),
gen_statem:call(Pid, {async_call, self(), ?PUSH_INVOKE, InvokeBin}). % gen_statem:call(Pid, {rpc_call, self(), ?PUSH_TASK_LOG, TaskLogBin}).
-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) ->
TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}),
gen_statem:call(Pid, {async_call, self(), ?PUSH_TASK_LOG, TaskLogBin}).
-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}. -spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}.
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
receive receive
{async_call_reply, Ref, #async_call_reply{code = 1, result = Result}} -> {rpc_reply, Ref, #rpc_reply{code = 1, payload = Result}} ->
{ok, Result}; {ok, Result};
{async_call_reply, Ref, #async_call_reply{code = 0, message = Message}} -> {rpc_reply, Ref, #rpc_reply{code = 0, payload = Message}} ->
{error, Message} {error, Message}
after Timeout -> after Timeout ->
{error, <<"timeout">>} {error, <<"timeout">>}
@ -227,14 +224,14 @@ handle_event({call, From}, get_status, _, State = #state{channel_pid = ChannelPi
{keep_state, State, [{reply, From, {ok, Reply}}]}; {keep_state, State, [{reply, From, {ok, Reply}}]};
%% channel存在 %% channel存在
handle_event({call, From}, {async_call, ReceiverPid, PushType, PushBin}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> handle_event({call, From}, {rpc_call, ReceiverPid, RpcCall}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
case HasSession andalso is_pid(ChannelPid) of case HasSession andalso is_pid(ChannelPid) of
true -> true ->
%% websocket发送请求 %% websocket发送请求
Ref = tcp_channel:async_call(ChannelPid, ReceiverPid, PushType, PushBin), Ref = tcp_channel:rpc_call(ChannelPid, ReceiverPid, RpcCall),
{keep_state, State, [{reply, From, {ok, Ref}}]}; {keep_state, State, [{reply, From, {ok, Ref}}]};
false -> false ->
lager:debug("[iot_host] uuid: ~p, publish_type: ~p, invalid state: ~p", [UUID, PushType, state_map(State)]), lager:debug("[iot_host] uuid: ~p, invalid state: ~p", [UUID, state_map(State)]),
{keep_state, State, [{reply, From, {error, <<"主机离线,发送请求失败"/utf8>>}}]} {keep_state, State, [{reply, From, {error, <<"主机离线,发送请求失败"/utf8>>}}]}
end; end;
@ -372,10 +369,6 @@ handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{u
lager:debug("[iot_host] ping host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), lager:debug("[iot_host] ping host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
{keep_state, State#state{metrics = Metrics}}; {keep_state, State#state{metrics = Metrics}};
handle_event(cast, {handle, {inform, #service_inform{service_id = ServiceId, status = Status, timestamp = Timestamp}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] inform host: ~p, service_id: ~p, status: ~p, timestamp: ~p", [UUID, ServiceId, Status, Timestamp]),
{keep_state, State};
handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]), lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]),
%DevicePid = iot_device:get_pid(DeviceUUID), %DevicePid = iot_device:get_pid(DeviceUUID),

View File

@ -16,10 +16,10 @@
%% API %% API
-export([encode/2, decode/1]). -export([encode/2, decode/1]).
-spec encode0(Message :: any()) -> binary(). -spec encode(MessageType :: integer(), Message :: any()) -> binary().
encode(PacketType, Packet) when is_integer(PacketType) -> encode(MessageType, Message) when is_integer(MessageType) ->
Bin = encode0(Packet), Bin = encode0(Message),
<<PacketType, Bin/binary>>. <<MessageType, Bin/binary>>.
encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}) -> encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}) ->
iolist_to_binary([ iolist_to_binary([
marshal(?Bytes, UUID), marshal(?Bytes, UUID),
@ -28,7 +28,7 @@ encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Tok
marshal(?Bytes, Token), marshal(?Bytes, Token),
marshal(?I32, Timestamp) marshal(?I32, Timestamp)
]); ]);
encode0(#auth_reply{code = Code, message = Message}) -> encode0(#rpc_reply{code = Code, payload = Message}) ->
iolist_to_binary([ iolist_to_binary([
marshal(?I32, Code), marshal(?I32, Code),
marshal(?Bytes, Message) marshal(?Bytes, Message)
@ -78,8 +78,8 @@ decode(<<PacketType:8, Packet/binary>>) ->
end. end.
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) -> decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
{ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}}; {ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}};
decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) -> decode0(?MESSAGE_RPC_REPLY, [Code, Message]) ->
{ok, #auth_reply{code = Code, message = Message}}; {ok, #rpc_reply{code = Code, payload = Message}};
decode0(?MESSAGE_PUB, [Topic, Content]) -> decode0(?MESSAGE_PUB, [Topic, Content]) ->
{ok, #pub{topic = Topic, content = Content}}; {ok, #pub{topic = Topic, content = Content}};
decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->

View File

@ -11,10 +11,9 @@
-include("message.hrl"). -include("message.hrl").
%% API %% API
-export([pub/3, async_call/4, command/3]). -export([pub/3, rpc_call/3, command/3]).
-export([stop/2]).
-export([start_link/2]). -export([start_link/2, stop/2]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
@ -43,10 +42,10 @@ command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is
gen_server:cast(Pid, {command, CommandType, Command}). gen_server:cast(Pid, {command, CommandType, Command}).
%% %%
-spec async_call(Pid :: pid(), ReceiverPid :: pid(), CallType :: integer(), CallBin :: binary()) -> Ref :: reference(). -spec rpc_call(Pid :: pid(), ReceiverPid :: pid(), CallBin :: binary()) -> Ref :: reference().
async_call(Pid, ReceiverPid, CallType, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_integer(CallType), is_binary(CallBin) -> rpc_call(Pid, ReceiverPid, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_binary(CallBin) ->
Ref = make_ref(), Ref = make_ref(),
gen_server:cast(Pid, {async_call, ReceiverPid, Ref, CallType, CallBin}), gen_server:cast(Pid, {rpc_call, ReceiverPid, Ref, CallBin}),
Ref. Ref.
%% %%
@ -79,18 +78,19 @@ handle_call(_Request, _From, State) ->
%% , pub/sub机制 %% , pub/sub机制
handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) -> handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) ->
PubBin = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}), EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}),
Transport:send(Socket, <<?PACKET_PUB, PubBin/binary>>), Transport:send(Socket, <<?PACKET_CAST, EncPub/binary>>),
{noreply, State}; {noreply, State};
%% Command消息 %% Command消息
handle_cast({command, CommandType, Command}, State = #state{transport = Transport, socket = Socket}) -> handle_cast({command, CommandType, Command}, State = #state{transport = Transport, socket = Socket}) ->
Transport:send(Socket, <<?PACKET_COMMAND, CommandType:8, Command/binary>>), EncCommand = message_codec:encode(?MESSAGE_COMMAND, #command{command_type = CommandType, command = Command}),
Transport:send(Socket, <<?PACKET_CAST, EncCommand/binary>>),
{noreply, State}; {noreply, State};
%% %%
handle_cast({async_call, ReceiverPid, Ref, CallType, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_cast({rpc_call, ReceiverPid, Ref, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Transport:send(Socket, <<?PACKET_RPC, PacketId:32, CallType:8, CallBin/binary>>), Transport:send(Socket, <<?PACKET_REQUEST, PacketId:32, CallBin/binary>>),
{noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}. {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}.
%% auth验证 %% auth验证
@ -110,20 +110,20 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, RequestBin/binary>>},
ok -> ok ->
%% host的monitor %% host的monitor
erlang:monitor(process, HostPid), erlang:monitor(process, HostPid),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 0, message = <<"ok">>}), AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = <<"ok">>}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>), Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
{noreply, State#state{uuid = UUID, host_pid = HostPid}}; {noreply, State#state{uuid = UUID, host_pid = HostPid}};
{denied, Reason} when is_binary(Reason) -> {denied, Reason} when is_binary(Reason) ->
erlang:monitor(process, HostPid), erlang:monitor(process, HostPid),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 1, message = Reason}), AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>), Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
{noreply, State#state{uuid = UUID, host_pid = HostPid}}; {noreply, State#state{uuid = UUID, host_pid = HostPid}};
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 2, message = Reason}), AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 2, payload = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>), Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
@ -151,7 +151,7 @@ handle_info({tcp, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{sock
% {noreply, State}; % {noreply, State};
%% %%
handle_info({tcp, Socket, <<?PACKET_RPC_REPLY, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 -> handle_info({tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 ->
{ok, RpcReply} = message_codec:decode(ResponseBin), {ok, RpcReply} = message_codec:decode(ResponseBin),
case maps:take(PacketId, Inflight) of case maps:take(PacketId, Inflight) of
error -> error ->
@ -166,10 +166,6 @@ handle_info({tcp, Socket, <<?PACKET_RPC_REPLY, PacketId:32, ResponseBin/binary>>
{noreply, State#state{inflight = NInflight}} {noreply, State#state{inflight = NInflight}}
end; end;
%% efka的ping包
handle_info({tcp, Socket, <<?PACKET_PING>>}, State = #state{socket = Socket}) ->
{noreply, State};
handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) ->
lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]), lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]),
{stop, normal, State}; {stop, normal, State};