Class: TinyQ::Bucket
- Inherits:
-
Object
- Object
- TinyQ::Bucket
- Defined in:
- lib/tinyq/bucket.rb
Overview
Bucket, messages get dropped into the bucket and forwarded to the different funnels connected to that bucket
Instance Attribute Summary collapse
-
#funnels ⇒ Object
Returns the value of attribute funnels.
-
#message_ids ⇒ Object
readonly
Returns the value of attribute message_ids.
-
#messages ⇒ Object
readonly
Returns the value of attribute messages.
-
#name ⇒ Object
Returns the value of attribute name.
-
#pendings ⇒ Object
readonly
Returns the value of attribute pendings.
-
#permanent ⇒ Object
Returns the value of attribute permanent.
-
#references ⇒ Object
readonly
Returns the value of attribute references.
Instance Method Summary collapse
- #dequeue(funnel) ⇒ Object
- #feed_funnel(funnel) ⇒ Object
- #funnel(name, broadcaster = false) ⇒ Object
-
#initialize(n, p = false) ⇒ Bucket
constructor
A new instance of Bucket.
- #message_sent(funnel, message_id) ⇒ Object
- #put_message(message) ⇒ Object
Constructor Details
#initialize(n, p = false) ⇒ Bucket
Returns a new instance of Bucket.
18 19 20 21 22 23 24 25 26 27 |
# File 'lib/tinyq/bucket.rb', line 18 def initialize(n, p = false) @name = n @permanent = p @messages = {} @message_ids = [] @references = {} @pendings = {} @funnels = {} @uuid = UUID.new end |
Instance Attribute Details
#funnels ⇒ Object
Returns the value of attribute funnels.
14 15 16 |
# File 'lib/tinyq/bucket.rb', line 14 def funnels @funnels end |
#message_ids ⇒ Object (readonly)
Returns the value of attribute message_ids.
10 11 12 |
# File 'lib/tinyq/bucket.rb', line 10 def @message_ids end |
#messages ⇒ Object (readonly)
Returns the value of attribute messages.
9 10 11 |
# File 'lib/tinyq/bucket.rb', line 9 def @messages end |
#name ⇒ Object
Returns the value of attribute name.
7 8 9 |
# File 'lib/tinyq/bucket.rb', line 7 def name @name end |
#pendings ⇒ Object (readonly)
Returns the value of attribute pendings.
12 13 14 |
# File 'lib/tinyq/bucket.rb', line 12 def pendings @pendings end |
#permanent ⇒ Object
Returns the value of attribute permanent.
16 17 18 |
# File 'lib/tinyq/bucket.rb', line 16 def permanent @permanent end |
#references ⇒ Object (readonly)
Returns the value of attribute references.
11 12 13 |
# File 'lib/tinyq/bucket.rb', line 11 def references @references end |
Instance Method Details
#dequeue(funnel) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/tinyq/bucket.rb', line 51 def dequeue(funnel) if !@messages.empty? = nil @message_ids.each do |mid| $LOG.debug("Bucket #{@name} - #{mid} references: #{@references[mid]}") if @references[mid].count(funnel.name) > 0 # That message was not de-referenced yet $LOG.debug("Bucket #{@name} - #{mid} -> #{funnel.name}") = mid break end end if nil != $LOG.debug("Bucket #{@name} - Sending #{} to funnel #{funnel.name}") = @messages[] # Remove the given funnel from a reference @references[].delete(funnel.name) # Add the given funnel to the pending list for that message @pendings[].push(funnel.name) [, ] else $LOG.debug("Bucket #{@name} - No more messages for funnel #{funnel.name}") [nil, nil] end else [nil, nil] end end |
#feed_funnel(funnel) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/tinyq/bucket.rb', line 116 def feed_funnel(funnel) @funnels[funnel.name] = funnel # Add ourself to the list of buckets for that funnel if funnel.buckets.count self == 0 funnel.buckets.push(self) end # If we have cached messages, notify new funnel if !@messages.empty? # Add the current funnels as a reference @message_ids.each do |mid| @references[mid].push(funnel.name) end $LOG.debug("Bucket #{@name} - Notifying funnel #{funnel.name}") funnel.notify(self) end end |
#funnel(name, broadcaster = false) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/tinyq/bucket.rb', line 103 def funnel(name, broadcaster = false) funnel = @funnels[name] if nil == funnel $LOG.info("Bucket #{@name} - Creating funnel #{name}") funnel = Funnel.new(name, broadcaster) feed_funnel(funnel) end # Update potential settings funnel.broadcaster = broadcaster funnel end |
#message_sent(funnel, message_id) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/tinyq/bucket.rb', line 84 def (funnel, ) $LOG.debug("Bucket #{@name} - Message #{} sent on #{funnel.name}") @pendings[].delete(funnel.name) # If no funnels are either pending or referenced, then message can be removed if @pendings[].empty? && @references[].empty? $LOG.debug("Bucket #{@name} - Purge message #{}") # No more references, message can be deleted Permanent.remove "#{}.dat" unless !@permanent @messages.delete() @message_ids.delete() @references.delete() @pendings.delete() true end false end |
#put_message(message) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/tinyq/bucket.rb', line 29 def () = @uuid.generate #message[:__id] = message_id #message[:__sent] = Time.now.iso8601 # If permantent bucket, then store Permanent.store ,"#{}.dat", { :gzip => true } unless !@permanent @messages[] = @message_ids.push() @references[] = @funnels.keys @pendings[] = [] if !@funnels.empty? # Put message in each funnel @funnels.each do |n,funnel| $LOG.debug("Bucket #{@name} - Notifying funnel #{funnel.name}") funnel.notify(self) end end end |