Class: LogStash::Outputs::WebSocket::Pubsub
- Inherits:
-
Object
- Object
- LogStash::Outputs::WebSocket::Pubsub
- Defined in:
- lib/logstash/outputs/websocket_topics/pubsub.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
-
#initialize ⇒ Pubsub
constructor
A new instance of Pubsub.
-
#publish(object) ⇒ Object
def initialize.
-
#subscribe(&block) ⇒ Object
def Pubsub.
Constructor Details
#initialize ⇒ Pubsub
Returns a new instance of Pubsub.
8 9 10 11 |
# File 'lib/logstash/outputs/websocket_topics/pubsub.rb', line 8 def initialize @subscribers = [] @subscribers_lock = Mutex.new end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
6 7 8 |
# File 'lib/logstash/outputs/websocket_topics/pubsub.rb', line 6 def logger @logger end |
Instance Method Details
#publish(object) ⇒ Object
def initialize
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/logstash/outputs/websocket_topics/pubsub.rb', line 13 def publish(object) @subscribers_lock.synchronize do break if @subscribers.size == 0 failed = [] @subscribers.each do |subscriber| begin subscriber.call(object) rescue => e @logger.error("Failed to publish to subscriber", :subscriber => subscriber, :exception => e) failed << subscriber end end failed.each do |subscriber| @subscribers.delete(subscriber) end end # @subscribers_lock.synchronize end |
#subscribe(&block) ⇒ Object
def Pubsub
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/logstash/outputs/websocket_topics/pubsub.rb', line 33 def subscribe(&block) queue = Queue.new @subscribers_lock.synchronize do @subscribers << proc do |event| queue << event end end while true block.call(queue.pop) end end |