Class: Krakow::Consumer::Queue
- Inherits:
-
Object
- Object
- Krakow::Consumer::Queue
- Includes:
- Celluloid, Utils::Lazy
- Defined in:
- lib/krakow/consumer/queue.rb
Instance Attribute Summary collapse
- #consumer ⇒ Consumer readonly
-
#pop_order ⇒ Array
readonly
Order of message removal.
-
#removal_callback ⇒ Symbol
readonly
Callback method name.
Instance Method Summary collapse
-
#deregister_connection(identifier) ⇒ Array<FrameType::Message>
Remove connection registration and remove all messages.
-
#initialize(consumer, *args) ⇒ self
constructor
Create new consumer queue instance.
-
#messages {|messages| ... } ⇒ Hash
Message container.
-
#pop ⇒ Object
(also: #deq)
Pop first item off the queue.
-
#push(message) ⇒ self
(also: #<<, #enq)
Push new message into queue.
-
#register_connection(connection) ⇒ TrueClass
Register a new connection.
-
#scrub_duplicate_message(message) ⇒ TrueClass, FalseClass
Remove duplicate message from queue if possible.
-
#size ⇒ Integer
Number of queued messages.
-
#validate_message(message) ⇒ Object
Validate message.
Methods included from Utils::Lazy
Methods included from Utils::Logging
Constructor Details
#initialize(consumer, *args) ⇒ self
Create new consumer queue instance
23 24 25 26 27 28 29 30 |
# File 'lib/krakow/consumer/queue.rb', line 23 def initialize(consumer, *args) opts = args.detect{|x| x.is_a?(Hash)} @consumer = consumer @removal_callback = opts[:removal_callback] @messages = {} @pop_order = [] @cleaner = nil end |
Instance Attribute Details
#consumer ⇒ Consumer (readonly)
13 14 15 |
# File 'lib/krakow/consumer/queue.rb', line 13 def consumer @consumer end |
#pop_order ⇒ Array (readonly)
Returns order of message removal.
15 16 17 |
# File 'lib/krakow/consumer/queue.rb', line 15 def pop_order @pop_order end |
#removal_callback ⇒ Symbol (readonly)
Returns callback method name.
17 18 19 |
# File 'lib/krakow/consumer/queue.rb', line 17 def removal_callback @removal_callback end |
Instance Method Details
#deregister_connection(identifier) ⇒ Array<FrameType::Message>
Remove connection registration and remove all messages
59 60 61 62 63 64 65 |
# File 'lib/krakow/consumer/queue.rb', line 59 def deregister_connection(identifier) do |collection| removed = collection.delete(identifier) pop_order.delete(identifier) removed end end |
#messages {|messages| ... } ⇒ Hash
Message container
36 37 38 39 40 41 42 |
# File 'lib/krakow/consumer/queue.rb', line 36 def if(block_given?) yield @messages else @messages end end |
#pop ⇒ Object Also known as: deq
Pop first item off the queue
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/krakow/consumer/queue.rb', line 92 def pop = nil until() wait(:new_message) if pop_order.empty? do |collection| key = pop_order.shift if(key) = collection[key].shift = () end end end end |
#push(message) ⇒ self Also known as: <<, enq
Push new message into queue
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/krakow/consumer/queue.rb', line 71 def push() unless(.is_a?(FrameType::Message)) abort TypeError.new "Expecting `FrameType::Message` but received `#{.class}`!" end do |collection| begin collection[.connection.identifier] << pop_order << .connection.identifier rescue Celluloid::DeadActorError abort Error::ConnectionUnavailable.new end end signal(:new_message) current_actor end |
#register_connection(connection) ⇒ TrueClass
Register a new connection
48 49 50 51 52 53 |
# File 'lib/krakow/consumer/queue.rb', line 48 def register_connection(connection) do |collection| collection[connection.identifier] = [] end true end |
#scrub_duplicate_message(message) ⇒ TrueClass, FalseClass
Remove duplicate message from queue if possible
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/krakow/consumer/queue.rb', line 119 def () do |collection| idx = collection[.connection.identifier].index do |msg| msg. == . end if(idx) msg = collection[.connection.identifier].delete_at(idx) if(removal_callback) consumer.send(removal_callback, []) end true else false end end end |
#size ⇒ Integer
Returns number of queued messages.
109 110 111 112 113 |
# File 'lib/krakow/consumer/queue.rb', line 109 def size do |collection| collection.values.map(&:size).inject(&:+) end end |
#validate_message(message) ⇒ Object
Validate message
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/krakow/consumer/queue.rb', line 137 def () if(.instance_stamp > .instance_stamp + (.connection.endpoint_settings[:msg_timeout] / 1000.0)) warn "Message exceeded timeout! Discarding. (#{})" if(removal_callback) consumer.send(removal_callback, []) end nil else end end |