%%%------------------------------------------------------------------- %%% @author licheng5 %%% @copyright (C) 2021, %%% @doc %%% %%% @end %%% Created : 11. 1月 2021 上午12:17 %%%------------------------------------------------------------------- -module(ws_channel). -author("licheng5"). -include("efka_service.hrl"). %% API -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). %% 最大的等待时间 -define(PENDING_TIMEOUT, 10 * 1000). -record(state, { packet_id = 1, service_id :: undefined | binary(), service_pid :: undefined | pid(), is_registered = false :: boolean(), %% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}}; 自身的inflight需要超时逻辑处理 inflight = #{} }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 逻辑处理方法 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% init(Req, Opts) -> {cowboy_websocket, Req, Opts}. websocket_init(_State) -> lager:debug("[ws_channel] get a new connection"), %% 初始状态为true {ok, #state{packet_id = 1}}. websocket_handle({binary, <>}, State) -> Request = jiffy:decode(Data, [return_maps]), handle_request(Request, State); %% 处理micro-client:response => efka 的响应 websocket_handle({binary, <>}, State = #state{inflight = Inflight}) -> Resp = jiffy:decode(Data, [return_maps]), {PacketId, Reply} = case Resp of #{<<"id">> := Id, <<"result">> := Result} -> {Id, {ok, Result}}; #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> {Id, {error, Error}} end, case maps:take(PacketId, Inflight) of error -> lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, PacketId]), {ok, State}; {{ReceiverPid, Ref}, NInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of true -> ReceiverPid ! {channel_reply, Ref, Reply}; false -> lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, PacketId]) end, {ok, State#state{inflight = NInflight}} end; websocket_handle(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}. websocket_info({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{packet_id = PacketId, inflight = Inflight}) -> PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson}}, Packet = jiffy:encode(PushConfig, [force_utf8]), Reply = <>, erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), {reply, {binary, Reply}, State#state{packet_id = gen_channel:next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; %% 远程调用 websocket_info({invoke, Ref, ReceiverPid, Payload}, State = #state{packet_id = PacketId, inflight = Inflight}) -> PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload}}, Packet = jiffy:encode(PushConfig, [force_utf8]), Reply = <>, erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), {reply, {binary, Reply}, State#state{packet_id = gen_channel:next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; %% 订阅的消息 websocket_info({topic_broadcast, Topic, Content}, State = #state{}) -> Packet = jiffy:encode(#{<<"topic">> => Topic, <<"content">> => Content}, [force_utf8]), Reply = <>, {reply, {binary, Reply}, State}; %% 超时逻辑处理 websocket_info({timeout, _, {pending_timeout, Id}}, State = #state{inflight = Inflight}) -> case maps:take(Id, Inflight) of error -> {ok, State}; {{ReceiverPid, Ref}, NInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of true -> ReceiverPid ! {channel_reply, Ref, {error, <<"timeout">>}}; false -> ok end, {ok, State#state{inflight = NInflight}} end; %% service进程关闭 websocket_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{service_pid = ServicePid}) -> lager:debug("[tcp_channel] service_pid: ~p, exited: ~p", [ServicePid, Reason]), {stop, State#state{service_pid = undefined}}; websocket_info({timeout, _, {stop, Reason}}, State) -> lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), {stop, State}; %% 处理关闭信号 websocket_info({stop, Reason}, State) -> lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), {stop, State}; %% 处理其他未知消息 websocket_info(Info, State) -> lager:debug("[ws_channel] channel get unknown info: ~p", [Info]), {ok, State}. %% 进程关闭事件 terminate(Reason, _Req, State) -> lager:debug("[ws_channel] channel close with reason: ~p, state is: ~p", [Reason, State]), ok. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% helper methods %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 注册 handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := #{<<"service_id">> := ServiceId}}, State) -> case efka_service:get_pid(ServiceId) of undefined -> lager:warning("[efka_tcp_channel] service_id: ~p, not running", [ServiceId]), Packet = gen_channel:json_error(Id, -1, <<"service not running">>), Reply = <>, delay_stop(10, normal), {reply, {binary, Reply}, State}; ServicePid when is_pid(ServicePid) -> case efka_service:attach_channel(ServicePid, self()) of ok -> Packet = gen_channel:json_result(Id, <<"ok">>), erlang:monitor(process, ServicePid), Reply = <>, {reply, {binary, Reply}, State#state{service_id = ServiceId, service_pid = ServicePid, is_registered = true}}; {error, Error} -> lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), Packet = gen_channel:json_error(Id, -1, Error), Reply = <>, delay_stop(10, normal), {reply, {binary, Reply}, State} end end; %% 请求参数 handle_request(#{<<"id">> := Id, <<"method">> := <<"request_config">>}, State = #state{service_pid = ServicePid, is_registered = true}) -> {ok, ConfigJson} = efka_service:request_config(ServicePid), Packet = gen_channel:json_result(Id, ConfigJson), Reply = <>, {reply, {binary, Reply}, State}; %% 数据项 handle_request(#{<<"id">> := 0, <<"method">> := <<"metric_data">>, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> efka_service:metric_data(ServicePid, DeviceUUID, RouteKey, Metric), {ok, State}; %% Event事件 handle_request(#{<<"id">> := 0, <<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) -> efka_service:send_event(ServicePid, EventType, Body), {ok, State}; %% 订阅事件 handle_request(#{<<"id">> := 0, <<"method">> := <<"subscribe">>, <<"params">> := #{<<"topic">> := Topic}}, State = #state{is_registered = true}) -> efka_subscription:subscribe(Topic, self()), {ok, State}. delay_stop(Timeout, Reason) when is_integer(Timeout) -> erlang:start_timer(Timeout, self(), {stop, Reason}).