diff --git a/apps/iot/src/mocker/iot_mqtt_consumer.erl b/apps/iot/src/mocker/iot_mqtt_consumer.erl deleted file mode 100644 index bef47e5..0000000 --- a/apps/iot/src/mocker/iot_mqtt_consumer.erl +++ /dev/null @@ -1,281 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% 1. 需要考虑集群部署的相关问题,上行的数据可能在集群中共享 -%%% 2. host进程不能直接去监听topic,这样涉及到新增和下线的很多问题 -%%% @end -%%% Created : 12. 3月 2023 21:27 -%%%------------------------------------------------------------------- --module(iot_mqtt_consumer). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/0]). --export([mock/5]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). --define(RETRY_INTERVAL, 5000). - -%% 执行超时时间 --define(EXECUTE_TIMEOUT, 10 * 1000). - -%% 需要订阅的主题信息 --define(Topics,[ - {<<"CET/NX/download">>, 2} -]). - --record(state, { - conn_pid :: undefined | pid(), - logger_pid :: pid(), - mqtt_props :: list(), - %% 执行中的任务数 - flight_num = 0 -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -mock(LocationCode, Para, SType, CType, Value) when is_binary(LocationCode), is_integer(SType), is_integer(CType), is_integer(Para) -> - Req = #{ - <<"version">> => <<"1.0">>, - <<"ts">> => iot_util:current_time(), - <<"properties">> => #{ - <<"type">> => <<"ctrl">>, - <<"para">> => Para, - <<"stype">> => SType, - <<"ctype">> => CType, - <<"value">> => Value, - <<"timestamp">> => iot_util:current_time() - }, - <<"location_code">> => LocationCode - }, - gen_server:call(?MODULE, {mock, Req}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - erlang:process_flag(trap_exit, true), - - {ok, Props} = application:get_env(iot, zhongdian), - %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 - erlang:start_timer(0, self(), create_consumer), - %% 启动日志记录器 - {ok, LoggerPid} = iot_logger:start_link("zd_directive_data"), - - {ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call({mock, Request}, _From, State = #state{conn_pid = ConnPid, flight_num = FlightNum}) when is_pid(ConnPid) -> - publish_directive(Request, jiffy:encode(Request, [force_utf8])), - {reply, ok, State#state{flight_num = FlightNum + 1}}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({disconnect, ReasonCode, Properties}, State) -> - lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), - {stop, disconnected, State}; -%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 -handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State = #state{flight_num = FlightNum}) -> - lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: 2", [Topic, Payload]), - - Request = catch jiffy:decode(Payload, [return_maps]), - publish_directive(Request, Payload), - - {noreply, State#state{flight_num = FlightNum + 1}}; - -handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> - lager:notice("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p, qos is error", [Topic, Payload, Qos]), - {noreply, State}; - -handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> - lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]), - {noreply, State}; - -handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, conn_pid = undefined}) -> - try - {ok, ConnPid} = create_consumer(Props), - {noreply, State#state{conn_pid = ConnPid}} - catch _:Error:Stack -> - lager:warning("[iot_zd_consumer] config: ~p, create consumer get error: ~p, stack: ~p", [Props, Error, Stack]), - erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer), - {noreply, State#state{conn_pid = undefined}} - end; - -%% postman进程挂掉时,重新建立新的 -handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> - lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]), - erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer), - - {noreply, State#state{conn_pid = undefined}}; - -handle_info({'EXIT', LoggerPid, Reason}, State = #state{logger_pid = LoggerPid}) -> - lager:warning("[iot_zd_consumer] logger exited with reason: ~p", [Reason]), - {ok, LoggerPid} = iot_logger:start_link("zd_directive_data"), - - {noreply, State#state{logger_pid = LoggerPid}}; - -handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid, flight_num = FlightNum}) -> - FlightInfo = <<"flight_num: ", (integer_to_binary(FlightNum - 1))/binary>>, - case Reply of - {ok, RawReq, DirectiveResult} -> - case DirectiveResult of - ok -> - iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, <<"OK">>, FlightInfo]); - {ok, Response} when is_binary(Response) -> - iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, Response, FlightInfo]); - {error, Reason0} -> - Reason = if - is_atom(Reason0) -> atom_to_binary(Reason0); - is_binary(Reason0) -> Reason0; - true -> <<"Unknow error">> - end, - iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Reason, FlightInfo]) - end; - {error, RawReq, Error} when is_binary(Error) -> - iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error, FlightInfo]) - end, - {noreply, State#state{flight_num = FlightNum - 1}}; - -handle_info(Info, State = #state{}) -> - lager:notice("[iot_zd_consumer] get a unknown info: ~p", [Info]), - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> - %% 取消topic的订阅 - TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics), - {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames), - - ok = emqtt:disconnect(ConnPid), - lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]), - ok; -terminate(Reason, _State) -> - lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}, RawReq) -> - %% 通过LocationCode查找到主机和Device_uuid - ReceiverPid = self(), - case redis_client:hgetall(LocationCode) of - {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> - case iot_host:get_pid(HostUUID) of - undefined -> - ReceiverPid ! {directive_reply, {error, RawReq, <<"host uuid: ", HostUUID/binary, " not found">>}}; - Pid when is_pid(Pid) -> - ok - end; - {ok, Map} when is_map(Map) -> - RedisData = iolist_to_binary(jiffy:encode(Map, [force_utf8])), - ReceiverPid ! {directive_reply, {error, RawReq, <<"invalid redis data: ", RedisData/binary>>}}; - _ -> - ReceiverPid ! {directive_reply, {error, RawReq, <<"location_code: ", LocationCode/binary, " not found in redis">>}} - end; -publish_directive(Other, RawReq) -> - lager:warning("[iot_zd_consumer] get a error message: ~p", [Other]), - self() ! {directive_reply, {error, RawReq, <<"unknown directive">>}}. - --spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}. -create_consumer(Props) when is_list(Props) -> - Node = atom_to_binary(node()), - ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt_consumer">>, - - %% 建立到emqx服务器的连接 - Host = proplists:get_value(host, Props), - Port = proplists:get_value(port, Props, 18080), - Username = proplists:get_value(username, Props), - Password = proplists:get_value(password, Props), - Keepalive = proplists:get_value(keepalive, Props, 86400), - - Opts = [ - {clientid, ClientId}, - {host, Host}, - {port, Port}, - {owner, self()}, - {tcp_opts, []}, - {username, Username}, - {password, Password}, - {keepalive, Keepalive}, - {auto_ack, true}, - {connect_timeout, 5000}, - {proto_ver, v5}, - {retry_interval, 5000} - ], - - %% 建立到emqx服务器的连接 - lager:debug("[iot_zd_consumer] opts is: ~p", [Opts]), - case emqtt:start_link(Opts) of - {ok, ConnPid} -> - %% 监听和host相关的全部事件 - lager:debug("[iot_zd_consumer] start conntecting, pid: ~p", [ConnPid]), - {ok, _} = emqtt:connect(ConnPid), - lager:debug("[iot_zd_consumer] connect success, pid: ~p", [ConnPid]), - SubscribeResult = emqtt:subscribe(ConnPid, ?Topics), - lager:debug("[iot_zd_consumer] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]), - - {ok, ConnPid}; - ignore -> - {error, ignore}; - {error, Reason} -> - {error, Reason} - end. \ No newline at end of file