%%%------------------------------------------------------------------- %%% @author licheng5 %%% @copyright (C) 2021, %%% @doc %%% %%% @end %%% Created : 11. 1月 2021 上午12:17 %%%------------------------------------------------------------------- -module(ws_channel). -author("licheng5"). -include("efka_service.hrl"). %% API -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). %% 最大的等待时间 -define(PENDING_TIMEOUT, 10 * 1000). -record(state, { service_pid :: undefined | pid(), is_registered = false :: boolean() }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 逻辑处理方法 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% init(Req, Opts) -> {cowboy_websocket, Req, Opts}. websocket_init(_State) -> lager:debug("[ws_channel] get a new connection"), %% 初始状态为true {ok, #state{}}. websocket_handle(ping, State) -> {reply, pong, State}; websocket_handle({text, Data}, State) -> Request = jiffy:decode(Data, [return_maps]), lager:debug("[ws_channle] get request: ~p", [Request]), handle_request(Request, State); websocket_handle(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}. %% 订阅的消息 websocket_info({topic_broadcast, Topic, Content}, State = #state{}) -> Req = iolist_to_binary(jiffy:encode(#{ <<"method">> => <<"publish">>, <<"params">> => #{<<"topic">> => Topic, <<"content">> => Content} }, [force_utf8])), {reply, {text, Req}, State}; %% service进程关闭 websocket_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{service_pid = ServicePid}) -> lager:debug("[ws_channel] container_pid: ~p, exited: ~p", [ServicePid, Reason]), {stop, State#state{service_pid = undefined}}; %% 处理关闭信号 websocket_info({stop, Reason}, State) -> lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), {stop, State}; %% 处理其他未知消息 websocket_info(Info, State) -> lager:debug("[ws_channel] channel get unknown info: ~p", [Info]), {ok, State}. %% 进程关闭事件 terminate(Reason, _Req, State) -> lager:debug("[ws_channel] channel close with reason: ~p, state is: ~p", [Reason, State]), ok. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% helper methods %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 注册, 要建立程序和容器之间的关系 handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := #{<<"service_id">> := ServiceId}}, State) -> {ok, ServicePid} = efka_service_sup:start_service(ServiceId), case efka_service:attach_channel(ServicePid, self()) of ok -> Reply = json_result(Id, <<"ok">>), erlang:monitor(process, ServicePid), {reply, {text, Reply}, State#state{service_pid = ServicePid, is_registered = true}}; {error, Error} -> lager:warning("[ws_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), {stop, State} end; %% 订阅事件 handle_request(#{<<"id">> := Id, <<"method">> := <<"subscribe">>, <<"params">> := #{<<"topic">> := Topic}}, State = #state{is_registered = true}) -> Reply = case efka_subscription:subscribe(Topic, self()) of ok -> json_result(Id, <<"ok">>); {error, Reason} -> json_error(Id, -1, Reason) end, {reply, {text, Reply}, State}; %% 数据项 handle_request(#{<<"method">> := <<"metric_data">>, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> efka_service:metric_data(ServicePid, DeviceUUID, RouteKey, Metric), {ok, State}; %% Event事件 handle_request(#{<<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) -> efka_service:send_event(ServicePid, EventType, Body), {ok, State}. -spec json_result(Id :: integer(), Result :: term()) -> binary(). json_result(Id, Result) when is_integer(Id) -> Response = #{ <<"id">> => Id, <<"result">> => Result }, jiffy:encode(Response, [force_utf8]). -spec json_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> Response = #{ <<"id">> => Id, <<"error">> => #{<<"code">> => Code, <<"message">> => Message} }, jiffy:encode(Response, [force_utf8]).