fix channel
This commit is contained in:
parent
4f6556c2a2
commit
53d0ee9f3a
@ -17,7 +17,6 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([start_link/2]).
|
-export([start_link/2]).
|
||||||
-export([get_name/1, get_pid/1, attach_channel/2]).
|
-export([get_name/1, get_pid/1, attach_channel/2]).
|
||||||
-export([invoke/3]).
|
|
||||||
-export([metric_data/4, send_event/3]).
|
-export([metric_data/4, send_event/3]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
@ -26,11 +25,7 @@
|
|||||||
-record(state, {
|
-record(state, {
|
||||||
service_id :: binary(),
|
service_id :: binary(),
|
||||||
%% 通道id信息
|
%% 通道id信息
|
||||||
channel_pid :: pid() | undefined,
|
channel_pid :: pid() | undefined
|
||||||
|
|
||||||
inflight = #{},
|
|
||||||
%% 映射关系: #{Ref => Fun}
|
|
||||||
callbacks = #{}
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -45,10 +40,6 @@ get_name(ServiceId) when is_binary(ServiceId) ->
|
|||||||
get_pid(ServiceId) when is_binary(ServiceId) ->
|
get_pid(ServiceId) when is_binary(ServiceId) ->
|
||||||
whereis(get_name(ServiceId)).
|
whereis(get_name(ServiceId)).
|
||||||
|
|
||||||
-spec invoke(Pid :: pid(), Ref :: reference(), Payload :: binary()) -> no_return().
|
|
||||||
invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload) ->
|
|
||||||
gen_server:cast(Pid, {invoke, Ref, self(), Payload}).
|
|
||||||
|
|
||||||
-spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
|
-spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
|
||||||
metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
|
metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
|
||||||
gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}).
|
gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}).
|
||||||
@ -78,7 +69,6 @@ start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) ->
|
|||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([ServiceId]) ->
|
init([ServiceId]) ->
|
||||||
%% supervisor进程通过exit(ChildPid, shutdown)调用的时候,确保terminate函数被调用
|
%% supervisor进程通过exit(ChildPid, shutdown)调用的时候,确保terminate函数被调用
|
||||||
erlang:process_flag(trap_exit, true),
|
|
||||||
lager:debug("[efka_service] service_id: ~p, started", [ServiceId]),
|
lager:debug("[efka_service] service_id: ~p, started", [ServiceId]),
|
||||||
{ok, #state{service_id = ServiceId}}.
|
{ok, #state{service_id = ServiceId}}.
|
||||||
|
|
||||||
@ -122,17 +112,6 @@ handle_cast({send_event, EventType, Params}, State = #state{service_id = Service
|
|||||||
lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]),
|
lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% 推送配置项目
|
|
||||||
handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{channel_pid = ChannelPid, inflight = Inflight}) ->
|
|
||||||
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
|
|
||||||
true ->
|
|
||||||
gen_channel:invoke(ChannelPid, Ref, self(), Payload),
|
|
||||||
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
|
|
||||||
false ->
|
|
||||||
ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}},
|
|
||||||
{reply, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_cast(_Request, State = #state{}) ->
|
handle_cast(_Request, State = #state{}) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
@ -142,26 +121,10 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
|
||||||
%% 处理channel的回复
|
|
||||||
handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) ->
|
|
||||||
case maps:take(Ref, Inflight) of
|
|
||||||
error ->
|
|
||||||
{noreply, State};
|
|
||||||
{ReceiverPid, NInflight} ->
|
|
||||||
ReceiverPid ! {service_reply, Ref, Reply},
|
|
||||||
|
|
||||||
{noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_info({Port, {data, Data}}, State = #state{service_id = ServiceId}) when is_port(Port) ->
|
|
||||||
lager:debug("[efka_service] service_id: ~p, port data: ~p", [ServiceId, Data]),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
%% 处理channel进程的退出
|
%% 处理channel进程的退出
|
||||||
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) ->
|
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) ->
|
||||||
lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ServiceId, Reason]),
|
lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ServiceId, Reason]),
|
||||||
{noreply, State#state{channel_pid = undefined, inflight = #{}}}.
|
{noreply, State#state{channel_pid = undefined}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
%% @doc This function is called by a gen_server when it is about to
|
||||||
@ -185,13 +148,3 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
-spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map().
|
|
||||||
trigger_callback(Ref, Callbacks) ->
|
|
||||||
case maps:take(Ref, Callbacks) of
|
|
||||||
error ->
|
|
||||||
Callbacks;
|
|
||||||
{Fun, NCallbacks} ->
|
|
||||||
catch Fun(),
|
|
||||||
NCallbacks
|
|
||||||
end.
|
|
||||||
Loading…
x
Reference in New Issue
Block a user