fix efka_client
This commit is contained in:
parent
97cb064943
commit
c2eb509706
@ -22,10 +22,12 @@
|
|||||||
%% 上传数据
|
%% 上传数据
|
||||||
-define(PACKET_PUSH, 16#03).
|
-define(PACKET_PUSH, 16#03).
|
||||||
|
|
||||||
|
-define(PACKET_PUB, 16#04).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/3]).
|
-export([start_link/3]).
|
||||||
-export([device_offline/1, device_online/1]).
|
-export([device_offline/1, device_online/1]).
|
||||||
-export([send_metric_data/4, request_config/0, send_event/2, controller_process/1]).
|
-export([send_metric_data/4, request_config/0, send_event/2, controller_process/1, subscribe/1]).
|
||||||
|
|
||||||
-export([test/0]).
|
-export([test/0]).
|
||||||
|
|
||||||
@ -53,6 +55,10 @@ controller_process(ControllerPid) when is_pid(ControllerPid) ->
|
|||||||
send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUID), is_binary(Measurement), is_map(Fields), is_map(Tags) ->
|
send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUID), is_binary(Measurement), is_map(Fields), is_map(Tags) ->
|
||||||
gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}).
|
gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}).
|
||||||
|
|
||||||
|
-spec subscribe(Topic :: binary()) -> no_return().
|
||||||
|
subscribe(Topic) when is_binary(Topic) ->
|
||||||
|
gen_server:cast(?MODULE, {subscribe, Topic}).
|
||||||
|
|
||||||
%% efka_server为了统一,r对象为字符串;需要2次json_decode
|
%% efka_server为了统一,r对象为字符串;需要2次json_decode
|
||||||
-spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}.
|
-spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}.
|
||||||
request_config() ->
|
request_config() ->
|
||||||
@ -103,8 +109,11 @@ init([ServiceId, Host, Port]) ->
|
|||||||
ok = gen_tcp:controlling_process(Socket, self()),
|
ok = gen_tcp:controlling_process(Socket, self()),
|
||||||
|
|
||||||
PacketId = 1,
|
PacketId = 1,
|
||||||
Register = #{<<"id">> => PacketId, <<"method">> => <<"register">>, <<"params">> => #{<<"service_id">> => ServiceId}},
|
Packet = jiffy:encode(#{
|
||||||
Packet = jiffy:encode(Register, [force_utf8]),
|
<<"id">> => PacketId,
|
||||||
|
<<"method">> => <<"register">>,
|
||||||
|
<<"params">> => #{<<"service_id">> => ServiceId}
|
||||||
|
}, [force_utf8]),
|
||||||
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
||||||
lager:debug("[efka_client] will send packet: ~p", [Packet]),
|
lager:debug("[efka_client] will send packet: ~p", [Packet]),
|
||||||
@ -136,10 +145,11 @@ handle_call({controller_process, ControllerPid}, _From, State) ->
|
|||||||
|
|
||||||
%% done
|
%% done
|
||||||
handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||||
RequestConfig = #{<<"id">> => PacketId, <<"method">> => <<"request_config">>},
|
Packet = jiffy:encode(#{<<"id">> => PacketId, <<"method">> => <<"request_config">>}, [force_utf8]),
|
||||||
Packet = jiffy:encode(RequestConfig, [force_utf8]),
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
||||||
|
|
||||||
|
erlang:start_timer(?EFKA_REQUEST_TIMEOUT, self(), {request_timeout, PacketId}),
|
||||||
|
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}.
|
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}.
|
||||||
|
|
||||||
@ -155,35 +165,44 @@ handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = #
|
|||||||
Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()),
|
Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()),
|
||||||
Body = efka_point:normalized(Point),
|
Body = efka_point:normalized(Point),
|
||||||
|
|
||||||
MetricData = #{
|
Packet = jiffy:encode(#{
|
||||||
<<"id">> => 0,
|
<<"id">> => 0,
|
||||||
<<"method">> => <<"metric_data">>,
|
<<"method">> => <<"metric_data">>,
|
||||||
<<"params">> => #{
|
<<"params">> => #{
|
||||||
<<"device_uuid">> => DeviceUUID,
|
<<"device_uuid">> => DeviceUUID,
|
||||||
<<"metric">> => Body
|
<<"metric">> => Body
|
||||||
}
|
}
|
||||||
},
|
}, [force_utf8]),
|
||||||
Packet = jiffy:encode(MetricData, [force_utf8]),
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
||||||
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% done
|
%% done
|
||||||
handle_cast({send_event, EventType, Body}, State = #state{socket = Socket}) ->
|
handle_cast({send_event, EventType, Body}, State = #state{socket = Socket}) ->
|
||||||
Event = #{
|
Packet = jiffy:encode(#{
|
||||||
<<"id">> => 0,
|
<<"id">> => 0,
|
||||||
<<"method">> => <<"event">>,
|
<<"method">> => <<"event">>,
|
||||||
<<"params">> => #{
|
<<"params">> => #{
|
||||||
<<"event_type">> => EventType,
|
<<"event_type">> => EventType,
|
||||||
<<"body">> => Body
|
<<"body">> => Body
|
||||||
}
|
}
|
||||||
},
|
}, [force_utf8]),
|
||||||
|
|
||||||
Packet = jiffy:encode(Event, [force_utf8]),
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
||||||
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_cast({subscribe, Topic}, State = #state{socket = Socket}) ->
|
||||||
|
Packet = jiffy:encode(#{
|
||||||
|
<<"id">> => 0,
|
||||||
|
<<"method">> => <<"subscribe">>,
|
||||||
|
<<"params">> => #{
|
||||||
|
<<"topic">> => Topic
|
||||||
|
}
|
||||||
|
}, [force_utf8]),
|
||||||
|
|
||||||
|
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast(_Info, State = #state{}) ->
|
handle_cast(_Info, State = #state{}) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
@ -215,10 +234,20 @@ handle_info({tcp, Socket, <<?PACKET_RESPONSE, Packet/binary>>}, State = #state{s
|
|||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% 收到efka推送的参数设置
|
%% 请求超时
|
||||||
|
handle_info({timeout, _, {request_timeout, PacketId}}, State = #state{inflight = Inflight}) ->
|
||||||
|
case maps:take(PacketId, Inflight) of
|
||||||
|
error ->
|
||||||
|
{noreply, State};
|
||||||
|
{{Ref, ReceiverPid}, NInflight} ->
|
||||||
|
ReceiverPid ! {response, Ref, {error, {-1, <<"request timeout">>}}},
|
||||||
|
{noreply, State#state{inflight = NInflight}}
|
||||||
|
end;
|
||||||
|
|
||||||
|
%% 收到efka推送的参数设置, 必须处理该消息;不设置超时
|
||||||
handle_info({tcp, Socket, <<?PACKET_PUSH, Packet/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
|
handle_info({tcp, Socket, <<?PACKET_PUSH, Packet/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
|
||||||
case jiffy:decode(Packet, [return_maps]) of
|
case jiffy:decode(Packet, [return_maps]) of
|
||||||
#{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := #{<<"config">> := ConfigJson, <<"timeout">> := Timeout}} ->
|
#{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := #{<<"config">> := ConfigJson}} ->
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
ControllerPid ! {push_config, Ref, ConfigJson},
|
ControllerPid ! {push_config, Ref, ConfigJson},
|
||||||
Reply =
|
Reply =
|
||||||
@ -227,14 +256,12 @@ handle_info({tcp, Socket, <<?PACKET_PUSH, Packet/binary>>}, State = #state{socke
|
|||||||
#{<<"id">> => Id, <<"result">> => <<"ok">>};
|
#{<<"id">> => Id, <<"result">> => <<"ok">>};
|
||||||
{push_config_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
{push_config_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
||||||
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
||||||
after Timeout * 1000 ->
|
|
||||||
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -2, <<"message">> => <<"timeout">>}}
|
|
||||||
end,
|
end,
|
||||||
Packet = jiffy:encode(Reply, [force_utf8]),
|
Packet = jiffy:encode(Reply, [force_utf8]),
|
||||||
ok = gen_tcp:send(Socket, Packet),
|
ok = gen_tcp:send(Socket, Packet),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
#{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload, <<"timeout">> := Timeout}} ->
|
#{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload}} ->
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
ControllerPid ! {invoke, Ref, Payload},
|
ControllerPid ! {invoke, Ref, Payload},
|
||||||
Reply =
|
Reply =
|
||||||
@ -243,14 +270,18 @@ handle_info({tcp, Socket, <<?PACKET_PUSH, Packet/binary>>}, State = #state{socke
|
|||||||
#{<<"id">> => Id, <<"result">> => Result};
|
#{<<"id">> => Id, <<"result">> => Result};
|
||||||
{invoke_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
{invoke_reply, Ref, {error, Reason}} when is_binary(Reason) ->
|
||||||
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
|
||||||
after Timeout * 1000 ->
|
|
||||||
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -2, <<"message">> => <<"invoke timeout">>}}
|
|
||||||
end,
|
end,
|
||||||
Packet = jiffy:encode(Reply, [force_utf8]),
|
Packet = jiffy:encode(Reply, [force_utf8]),
|
||||||
ok = gen_tcp:send(Socket, Packet),
|
ok = gen_tcp:send(Socket, Packet),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
%% pub/sub的消息
|
||||||
|
handle_info({tcp, Socket, <<?PACKET_PUB, Packet/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
|
||||||
|
#{<<"topic">> := Topic, <<"content">> := Content} = jiffy:decode(Packet, [return_maps]),
|
||||||
|
ControllerPid ! {pub, Topic, Content},
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
%% 其他消息为非法消息
|
%% 其他消息为非法消息
|
||||||
handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) ->
|
handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) ->
|
||||||
lager:debug("[efka_client] get unknown packet: ~p", [Packet]),
|
lager:debug("[efka_client] get unknown packet: ~p", [Packet]),
|
||||||
|
|||||||
@ -105,7 +105,7 @@ handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}
|
|||||||
%% 不需要回复的消息体采用广播的信息
|
%% 不需要回复的消息体采用广播的信息
|
||||||
MatchedSubscribers = match_subscribers(Subscribers, Topic),
|
MatchedSubscribers = match_subscribers(Subscribers, Topic),
|
||||||
lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) ->
|
lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) ->
|
||||||
SubscriberPid ! {topic_broadcast, Content}
|
SubscriberPid ! {topic_broadcast, Topic, Content}
|
||||||
end, MatchedSubscribers),
|
end, MatchedSubscribers),
|
||||||
|
|
||||||
efka_logger:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, MatchedSubscribers]),
|
efka_logger:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, MatchedSubscribers]),
|
||||||
|
|||||||
@ -20,6 +20,8 @@
|
|||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
%% 最大的等待时间
|
||||||
|
-define(PENDING_TIMEOUT, 10 * 1000).
|
||||||
%% 消息类型
|
%% 消息类型
|
||||||
|
|
||||||
%% 服务注册
|
%% 服务注册
|
||||||
@ -29,6 +31,8 @@
|
|||||||
%% 上传数据
|
%% 上传数据
|
||||||
-define(PACKET_PUSH, 16#03).
|
-define(PACKET_PUSH, 16#03).
|
||||||
|
|
||||||
|
-define(PACKET_PUB, 16#04).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
packet_id = 1,
|
packet_id = 1,
|
||||||
socket :: gen_tcp:socket(),
|
socket :: gen_tcp:socket(),
|
||||||
@ -36,7 +40,7 @@
|
|||||||
service_pid :: undefined | pid(),
|
service_pid :: undefined | pid(),
|
||||||
is_registered = false :: boolean(),
|
is_registered = false :: boolean(),
|
||||||
|
|
||||||
%% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}}
|
%% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}}; 自身的inflight需要超时逻辑处理
|
||||||
inflight = #{}
|
inflight = #{}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
@ -93,17 +97,20 @@ handle_call(_Request, _From, State = #state{}) ->
|
|||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
%% 推送配置项目
|
%% 推送配置项目
|
||||||
handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||||
PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson, <<"timeout">> => 5}},
|
PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson}},
|
||||||
Packet = jiffy:encode(PushConfig, [force_utf8]),
|
Packet = jiffy:encode(PushConfig, [force_utf8]),
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_PUSH:8, Packet/binary>>),
|
ok = gen_tcp:send(Socket, <<?PACKET_PUSH:8, Packet/binary>>),
|
||||||
|
|
||||||
|
erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}),
|
||||||
{noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
|
{noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
|
||||||
|
|
||||||
|
%% 远程调用
|
||||||
handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||||
PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload, <<"timeout">> => 5}},
|
PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload}},
|
||||||
Packet = jiffy:encode(PushConfig, [force_utf8]),
|
Packet = jiffy:encode(PushConfig, [force_utf8]),
|
||||||
|
|
||||||
ok = gen_tcp:send(Socket, <<?PACKET_PUSH:8, Packet/binary>>),
|
ok = gen_tcp:send(Socket, <<?PACKET_PUSH:8, Packet/binary>>),
|
||||||
|
|
||||||
|
erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}),
|
||||||
{noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
|
{noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
|
||||||
|
|
||||||
handle_cast(_Request, State = #state{}) ->
|
handle_cast(_Request, State = #state{}) ->
|
||||||
@ -127,10 +134,59 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST:8, Data/binary>>}, State = #state{so
|
|||||||
|
|
||||||
%% 处理client的响应
|
%% 处理client的响应
|
||||||
handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, Data/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
|
handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, Data/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
|
||||||
Response = jiffy:decode(Data, [return_maps]),
|
Resp = jiffy:decode(Data, [return_maps]),
|
||||||
NInflight = reply(Response, Inflight),
|
case Resp of
|
||||||
|
#{<<"id">> := Id, <<"result">> := Result} ->
|
||||||
|
case maps:take(Id, Inflight) of
|
||||||
|
error ->
|
||||||
|
lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]),
|
||||||
|
{noreply, State};
|
||||||
|
{{ReceiverPid, Ref}, NInflight} ->
|
||||||
|
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||||
|
true ->
|
||||||
|
ReceiverPid ! {channel_reply, Ref, {ok, Result}};
|
||||||
|
false ->
|
||||||
|
lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id])
|
||||||
|
end,
|
||||||
|
{noreply, State#state{inflight = NInflight}}
|
||||||
|
end;
|
||||||
|
#{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} ->
|
||||||
|
case maps:take(Id, Inflight) of
|
||||||
|
error ->
|
||||||
|
lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]),
|
||||||
|
{noreply, State};
|
||||||
|
{{ReceiverPid, Ref}, NInflight} ->
|
||||||
|
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||||
|
true ->
|
||||||
|
ReceiverPid ! {channel_reply, Ref, {error, Error}};
|
||||||
|
false ->
|
||||||
|
lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id])
|
||||||
|
end,
|
||||||
|
{noreply, State#state{inflight = NInflight}}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
|
||||||
{noreply, State#state{inflight = NInflight}};
|
%% 超时逻辑处理
|
||||||
|
handle_info({timeout, _, {pending_timeout, Id}}, State = #state{inflight = Inflight}) ->
|
||||||
|
case maps:take(Id, Inflight) of
|
||||||
|
error ->
|
||||||
|
{noreply, State};
|
||||||
|
{{ReceiverPid, Ref}, NInflight} ->
|
||||||
|
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||||
|
true ->
|
||||||
|
ReceiverPid ! {channel_reply, Ref, {error, <<"timeout">>}};
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
{noreply, State#state{inflight = NInflight}}
|
||||||
|
end;
|
||||||
|
|
||||||
|
%% 订阅的消息
|
||||||
|
handle_info({topic_broadcast, Topic, Content}, State = #state{socket = Socket}) ->
|
||||||
|
Packet = jiffy:encode(#{<<"topic">> => Topic, <<"content">> => Content}, [force_utf8]),
|
||||||
|
ok = gen_tcp:send(Socket, <<?PACKET_PUB:8, Packet/binary>>),
|
||||||
|
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(Info, State = #state{}) ->
|
handle_info(Info, State = #state{}) ->
|
||||||
lager:debug("[tcp_channel] get info: ~p", [Info]),
|
lager:debug("[tcp_channel] get info: ~p", [Info]),
|
||||||
@ -195,6 +251,11 @@ handle_request(#{<<"id">> := 0, <<"method">> := <<"metric_data">>, <<"params">>
|
|||||||
%% Event事件
|
%% Event事件
|
||||||
handle_request(#{<<"id">> := 0, <<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) ->
|
handle_request(#{<<"id">> := 0, <<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) ->
|
||||||
efka_service:send_event(ServicePid, EventType, Body),
|
efka_service:send_event(ServicePid, EventType, Body),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
%% 订阅事件
|
||||||
|
handle_request(#{<<"id">> := 0, <<"method">> := <<"subscribe">>, <<"params">> := #{<<"topic">> := Topic}}, State = #state{is_registered = true}) ->
|
||||||
|
efka_subscription:subscribe(Topic, self()),
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%% 采用32位编码
|
%% 采用32位编码
|
||||||
@ -222,33 +283,3 @@ json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(M
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
jiffy:encode(Response, [force_utf8]).
|
jiffy:encode(Response, [force_utf8]).
|
||||||
|
|
||||||
-spec reply(Resp :: map(), Inflight :: map()) -> NInflight :: map().
|
|
||||||
reply(Resp = #{<<"id">> := Id, <<"result">> := Result}, Inflight) ->
|
|
||||||
case maps:take(Id, Inflight) of
|
|
||||||
error ->
|
|
||||||
lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]),
|
|
||||||
Inflight;
|
|
||||||
{{ReceiverPid, Ref}, NInflight} ->
|
|
||||||
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
|
||||||
true ->
|
|
||||||
ReceiverPid ! {channel_reply, Ref, {ok, Result}};
|
|
||||||
false ->
|
|
||||||
lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id])
|
|
||||||
end,
|
|
||||||
NInflight
|
|
||||||
end;
|
|
||||||
reply(Resp = #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}}, Inflight) ->
|
|
||||||
case maps:take(Id, Inflight) of
|
|
||||||
error ->
|
|
||||||
lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]),
|
|
||||||
Inflight;
|
|
||||||
{{ReceiverPid, Ref}, NInflight} ->
|
|
||||||
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
|
||||||
true ->
|
|
||||||
ReceiverPid ! {channel_reply, Ref, {error, Error}};
|
|
||||||
false ->
|
|
||||||
lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id])
|
|
||||||
end,
|
|
||||||
NInflight
|
|
||||||
end.
|
|
||||||
Loading…
x
Reference in New Issue
Block a user