Class: TinyQ::Funnel
- Inherits:
-
Object
- Object
- TinyQ::Funnel
- Defined in:
- lib/tinyq/funnel.rb
Overview
Funnel will receive messages on the internal queue and will dispatch to subscribers
Instance Attribute Summary collapse
-
#broadcaster ⇒ Object
Returns the value of attribute broadcaster.
-
#buckets ⇒ Object
Returns the value of attribute buckets.
-
#name ⇒ Object
Returns the value of attribute name.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#subscribers ⇒ Object
Returns the value of attribute subscribers.
Instance Method Summary collapse
- #add_connection(c, n = 1) ⇒ Object
-
#initialize(n, b = false) ⇒ Funnel
constructor
A new instance of Funnel.
-
#notify(bucket) ⇒ Object
Method called by bucket when messages are available.
- #remove_connection(c) ⇒ Object
Constructor Details
#initialize(n, b = false) ⇒ Funnel
Returns a new instance of Funnel.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/tinyq/funnel.rb', line 13 def initialize(n, b = false) @name = n @broadcaster = b @queue = EventMachine::Queue.new @subscribers = {} @buckets = [] cb = Proc.new do |event| $LOG.debug("Funnel #{@name} - Callback") if !@subscribers.empty? # OK we can dequeue from bucket since we have somewhere to send it bucket = event[:Bucket] , = bucket.dequeue(self) $LOG.debug("Funnel #{@name} - Callback got '#{}'") if != nil if @broadcaster $LOG.debug("Funnel #{@name} - Broadcasting #{}") @subscribers.each do |c,subscriber| if subscriber.(bucket, self, , ) # Subscriber is done removing $LOG.debug("Funnel #{@name} - Subscriber received requested count") self.remove_connection(c) end end else $LOG.debug("Funnel #{@name} - Unicasting #{}") c = @subscribers.keys[0] subscriber = @subscribers[c] $LOG.debug("Funnel #{@name} - Unicasting to #{subscriber.connection.ip}:#{subscriber.connection.port}") if subscriber.(bucket, self, , ) # Subscriber is done removing $LOG.debug("Funnel #{@name} - Subscriber #{subscriber.connection.ip}:#{subscriber.connection.port} received requested count") self.remove_connection(c) end end else $LOG.debug("Funnel #{@name} - Callback noop") end end # Wait for next event @queue.pop &cb end @queue.pop &cb end |
Instance Attribute Details
#broadcaster ⇒ Object
Returns the value of attribute broadcaster.
11 12 13 |
# File 'lib/tinyq/funnel.rb', line 11 def broadcaster @broadcaster end |
#buckets ⇒ Object
Returns the value of attribute buckets.
9 10 11 |
# File 'lib/tinyq/funnel.rb', line 9 def buckets @buckets end |
#name ⇒ Object
Returns the value of attribute name.
7 8 9 |
# File 'lib/tinyq/funnel.rb', line 7 def name @name end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
8 9 10 |
# File 'lib/tinyq/funnel.rb', line 8 def queue @queue end |
#subscribers ⇒ Object
Returns the value of attribute subscribers.
10 11 12 |
# File 'lib/tinyq/funnel.rb', line 10 def subscribers @subscribers end |
Instance Method Details
#add_connection(c, n = 1) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/tinyq/funnel.rb', line 67 def add_connection(c,n = 1) subscriber = @subscribers[c] if nil == subscriber subscriber = Subscriber.new(c,n) @subscribers[c] = subscriber end # At this point, we need to see if any buckets we are connected to # have pending messages @buckets.each do |bucket| if !bucket..empty? self.notify(bucket) end end end |
#notify(bucket) ⇒ Object
Method called by bucket when messages are available
63 64 65 |
# File 'lib/tinyq/funnel.rb', line 63 def notify(bucket) @queue.push({:Event => "New Message", :Bucket => bucket}) end |
#remove_connection(c) ⇒ Object
83 84 85 86 |
# File 'lib/tinyq/funnel.rb', line 83 def remove_connection(c) $LOG.debug("Funnel #{@name} - Removing subscriber #{c.ip}:#{c.port}") @subscribers.delete(c) end |