Class: ActionCable::SubscriptionAdapter::Redis::Listener
- Inherits:
-
SubscriberMap::Async
- Object
- SubscriberMap
- SubscriberMap::Async
- ActionCable::SubscriptionAdapter::Redis::Listener
- Defined in:
- lib/action_cable/subscription_adapter/redis.rb
Defined Under Namespace
Classes: SubscribedClient
Instance Method Summary collapse
- #add_channel(channel, on_success) ⇒ Object
-
#initialize(adapter, config_options, executor) ⇒ Listener
constructor
A new instance of Listener.
- #listen(conn) ⇒ Object
- #remove_channel(channel) ⇒ Object
- #shutdown ⇒ Object
Methods inherited from SubscriberMap::Async
#add_subscriber, #invoke_callback, #remove_subscriber
Methods inherited from SubscriberMap
#add_subscriber, #broadcast, #invoke_callback, #remove_subscriber
Constructor Details
#initialize(adapter, config_options, executor) ⇒ Listener
Returns a new instance of Listener.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/action_cable/subscription_adapter/redis.rb', line 71 def initialize(adapter, , executor) super(executor) @adapter = adapter @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @reconnect_attempt = 0 # Use the same config as used by Redis conn @reconnect_attempts = .fetch(:reconnect_attempts, 1) @reconnect_attempts = Array.new(@reconnect_attempts, 0) if @reconnect_attempts.is_a?(Integer) @subscribed_client = nil @when_connected = [] @thread = nil end |
Instance Method Details
#add_channel(channel, on_success) ⇒ Object
143 144 145 146 147 148 149 |
# File 'lib/action_cable/subscription_adapter/redis.rb', line 143 def add_channel(channel, on_success) @subscription_lock.synchronize do ensure_listener_running @subscribe_callbacks[channel] << on_success when_connected { @subscribed_client.subscribe(channel) } end end |
#listen(conn) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/action_cable/subscription_adapter/redis.rb', line 91 def listen(conn) conn.without_reconnect do original_client = extract_subscribed_client(conn) conn.subscribe("_action_cable_internal") do |on| on.subscribe do |chan, count| @subscription_lock.synchronize do if count == 1 @reconnect_attempt = 0 @subscribed_client = original_client until @when_connected.empty? @when_connected.shift.call end end if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift @executor.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end end end on. do |chan, | broadcast(chan, ) end on.unsubscribe do |chan, count| if count == 0 @subscription_lock.synchronize do @subscribed_client = nil end end end end end end |
#remove_channel(channel) ⇒ Object
151 152 153 154 155 |
# File 'lib/action_cable/subscription_adapter/redis.rb', line 151 def remove_channel(channel) @subscription_lock.synchronize do when_connected { @subscribed_client.unsubscribe(channel) } end end |
#shutdown ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/action_cable/subscription_adapter/redis.rb', line 130 def shutdown @subscription_lock.synchronize do return if @thread.nil? when_connected do @subscribed_client.unsubscribe @subscribed_client = nil end end Thread.pass while @thread.alive? end |