remove mqtt consumer

This commit is contained in:
anlicheng 2025-10-31 14:51:42 +08:00
parent e5322939b9
commit 5d20300b50

View File

@ -1,281 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%% 1.
%%% 2. host进程不能直接去监听topic线
%%% @end
%%% Created : 12. 3 2023 21:27
%%%-------------------------------------------------------------------
-module(iot_mqtt_consumer).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([mock/5]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(RETRY_INTERVAL, 5000).
%%
-define(EXECUTE_TIMEOUT, 10 * 1000).
%%
-define(Topics,[
{<<"CET/NX/download">>, 2}
]).
-record(state, {
conn_pid :: undefined | pid(),
logger_pid :: pid(),
mqtt_props :: list(),
%%
flight_num = 0
}).
%%%===================================================================
%%% API
%%%===================================================================
mock(LocationCode, Para, SType, CType, Value) when is_binary(LocationCode), is_integer(SType), is_integer(CType), is_integer(Para) ->
Req = #{
<<"version">> => <<"1.0">>,
<<"ts">> => iot_util:current_time(),
<<"properties">> => #{
<<"type">> => <<"ctrl">>,
<<"para">> => Para,
<<"stype">> => SType,
<<"ctype">> => CType,
<<"value">> => Value,
<<"timestamp">> => iot_util:current_time()
},
<<"location_code">> => LocationCode
},
gen_server:call(?MODULE, {mock, Req}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
erlang:process_flag(trap_exit, true),
{ok, Props} = application:get_env(iot, zhongdian),
%% ,
erlang:start_timer(0, self(), create_consumer),
%%
{ok, LoggerPid} = iot_logger:start_link("zd_directive_data"),
{ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({mock, Request}, _From, State = #state{conn_pid = ConnPid, flight_num = FlightNum}) when is_pid(ConnPid) ->
publish_directive(Request, jiffy:encode(Request, [force_utf8])),
{reply, ok, State#state{flight_num = FlightNum + 1}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({disconnect, ReasonCode, Properties}, State) ->
lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
{stop, disconnected, State};
%% json反序列需要在host进程进行
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State = #state{flight_num = FlightNum}) ->
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: 2", [Topic, Payload]),
Request = catch jiffy:decode(Payload, [return_maps]),
publish_directive(Request, Payload),
{noreply, State#state{flight_num = FlightNum + 1}};
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
lager:notice("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p, qos is error", [Topic, Payload, Qos]),
{noreply, State};
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]),
{noreply, State};
handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, conn_pid = undefined}) ->
try
{ok, ConnPid} = create_consumer(Props),
{noreply, State#state{conn_pid = ConnPid}}
catch _:Error:Stack ->
lager:warning("[iot_zd_consumer] config: ~p, create consumer get error: ~p, stack: ~p", [Props, Error, Stack]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer),
{noreply, State#state{conn_pid = undefined}}
end;
%% postman进程挂掉时
handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer),
{noreply, State#state{conn_pid = undefined}};
handle_info({'EXIT', LoggerPid, Reason}, State = #state{logger_pid = LoggerPid}) ->
lager:warning("[iot_zd_consumer] logger exited with reason: ~p", [Reason]),
{ok, LoggerPid} = iot_logger:start_link("zd_directive_data"),
{noreply, State#state{logger_pid = LoggerPid}};
handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid, flight_num = FlightNum}) ->
FlightInfo = <<"flight_num: ", (integer_to_binary(FlightNum - 1))/binary>>,
case Reply of
{ok, RawReq, DirectiveResult} ->
case DirectiveResult of
ok ->
iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, <<"OK">>, FlightInfo]);
{ok, Response} when is_binary(Response) ->
iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, Response, FlightInfo]);
{error, Reason0} ->
Reason = if
is_atom(Reason0) -> atom_to_binary(Reason0);
is_binary(Reason0) -> Reason0;
true -> <<"Unknow error">>
end,
iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Reason, FlightInfo])
end;
{error, RawReq, Error} when is_binary(Error) ->
iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error, FlightInfo])
end,
{noreply, State#state{flight_num = FlightNum - 1}};
handle_info(Info, State = #state{}) ->
lager:notice("[iot_zd_consumer] get a unknown info: ~p", [Info]),
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
%% topic的订阅
TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics),
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames),
ok = emqtt:disconnect(ConnPid),
lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]),
ok;
terminate(Reason, _State) ->
lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}, RawReq) ->
%% LocationCode查找到主机和Device_uuid
ReceiverPid = self(),
case redis_client:hgetall(LocationCode) of
{ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} ->
case iot_host:get_pid(HostUUID) of
undefined ->
ReceiverPid ! {directive_reply, {error, RawReq, <<"host uuid: ", HostUUID/binary, " not found">>}};
Pid when is_pid(Pid) ->
ok
end;
{ok, Map} when is_map(Map) ->
RedisData = iolist_to_binary(jiffy:encode(Map, [force_utf8])),
ReceiverPid ! {directive_reply, {error, RawReq, <<"invalid redis data: ", RedisData/binary>>}};
_ ->
ReceiverPid ! {directive_reply, {error, RawReq, <<"location_code: ", LocationCode/binary, " not found in redis">>}}
end;
publish_directive(Other, RawReq) ->
lager:warning("[iot_zd_consumer] get a error message: ~p", [Other]),
self() ! {directive_reply, {error, RawReq, <<"unknown directive">>}}.
-spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}.
create_consumer(Props) when is_list(Props) ->
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt_consumer">>,
%% emqx服务器的连接
Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props, 18080),
Username = proplists:get_value(username, Props),
Password = proplists:get_value(password, Props),
Keepalive = proplists:get_value(keepalive, Props, 86400),
Opts = [
{clientid, ClientId},
{host, Host},
{port, Port},
{owner, self()},
{tcp_opts, []},
{username, Username},
{password, Password},
{keepalive, Keepalive},
{auto_ack, true},
{connect_timeout, 5000},
{proto_ver, v5},
{retry_interval, 5000}
],
%% emqx服务器的连接
lager:debug("[iot_zd_consumer] opts is: ~p", [Opts]),
case emqtt:start_link(Opts) of
{ok, ConnPid} ->
%% host相关的全部事件
lager:debug("[iot_zd_consumer] start conntecting, pid: ~p", [ConnPid]),
{ok, _} = emqtt:connect(ConnPid),
lager:debug("[iot_zd_consumer] connect success, pid: ~p", [ConnPid]),
SubscribeResult = emqtt:subscribe(ConnPid, ?Topics),
lager:debug("[iot_zd_consumer] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]),
{ok, ConnPid};
ignore ->
{error, ignore};
{error, Reason} ->
{error, Reason}
end.