Class: ActionCable::SubscriptionAdapter::PostgreSQL::Listener
- Inherits:
-
SubscriberMap
- Object
- SubscriberMap
- ActionCable::SubscriptionAdapter::PostgreSQL::Listener
show all
- Defined in:
- lib/action_cable/subscription_adapter/postgresql.rb
Instance Method Summary
collapse
#add_subscriber, #broadcast, #remove_subscriber
Constructor Details
#initialize(adapter, event_loop) ⇒ Listener
Returns a new instance of Listener.
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 49
def initialize(adapter, event_loop)
super()
@adapter = adapter
@event_loop = event_loop
@queue = Queue.new
@thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
end
|
Instance Method Details
#add_channel(channel, on_success) ⇒ Object
93
94
95
|
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 93
def add_channel(channel, on_success)
@queue.push([:listen, channel, on_success])
end
|
#invoke_callback ⇒ Object
101
102
103
|
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 101
def invoke_callback(*)
@event_loop.post { super }
end
|
#listen ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 62
def listen
@adapter.with_connection do |pg_conn|
catch :shutdown do
loop do
until @queue.empty?
action, channel, callback = @queue.pop(true)
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
@event_loop.post(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
throw :shutdown
end
end
pg_conn.wait_for_notify(1) do |chan, pid, message|
broadcast(chan, message)
end
end
end
end
end
|
#remove_channel(channel) ⇒ Object
97
98
99
|
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 97
def remove_channel(channel)
@queue.push([:unlisten, channel])
end
|
#shutdown ⇒ Object
88
89
90
91
|
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 88
def shutdown
@queue.push([:shutdown])
Thread.pass while @thread.alive?
end
|