Class: MicroQ::Queue::Sqs
- Inherits:
-
Object
- Object
- MicroQ::Queue::Sqs
- Includes:
- Celluloid
- Defined in:
- lib/micro_q/queue/sqs.rb
Instance Attribute Summary collapse
-
#entries ⇒ Object
readonly
Returns the value of attribute entries.
-
#fetchers ⇒ Object
readonly
Returns the value of attribute fetchers.
-
#later ⇒ Object
readonly
Returns the value of attribute later.
-
#messages ⇒ Object
Returns the value of attribute messages.
Class Method Summary collapse
Instance Method Summary collapse
- #dequeue(limit = 30) ⇒ Object
- #finished(item) ⇒ Object
-
#initialize ⇒ Sqs
constructor
A new instance of Sqs.
- #push(*args) ⇒ Object
- #receive_messages(*items) ⇒ Object
- #sync_push(item, options = {}) ⇒ Object
- #verify_queue(name) ⇒ Object
Constructor Details
#initialize ⇒ Sqs
Returns a new instance of Sqs.
11 12 13 14 15 16 17 18 |
# File 'lib/micro_q/queue/sqs.rb', line 11 def initialize @lock = Mutex.new @messages, @fetchers, @entries, @later = [], [], [], [] @fetcher_map = {} build_missing_fetchers end |
Instance Attribute Details
#entries ⇒ Object (readonly)
Returns the value of attribute entries.
9 10 11 |
# File 'lib/micro_q/queue/sqs.rb', line 9 def entries @entries end |
#fetchers ⇒ Object (readonly)
Returns the value of attribute fetchers.
9 10 11 |
# File 'lib/micro_q/queue/sqs.rb', line 9 def fetchers @fetchers end |
#later ⇒ Object (readonly)
Returns the value of attribute later.
9 10 11 |
# File 'lib/micro_q/queue/sqs.rb', line 9 def later @later end |
#messages ⇒ Object
Returns the value of attribute messages.
8 9 10 |
# File 'lib/micro_q/queue/sqs.rb', line 8 def @messages end |
Class Method Details
.shutdown! ⇒ Object
64 65 66 |
# File 'lib/micro_q/queue/sqs.rb', line 64 def self.shutdown! @shutdown = true end |
Instance Method Details
#dequeue(limit = 30) ⇒ Object
45 46 47 48 49 50 51 52 53 |
# File 'lib/micro_q/queue/sqs.rb', line 45 def dequeue(limit=30) return [] unless limit > 0 && .any? @lock.synchronize do limit.times.collect do .pop end.compact end end |
#finished(item) ⇒ Object
55 56 57 58 |
# File 'lib/micro_q/queue/sqs.rb', line 55 def finished(item) queue_name = verify_queue(item['queue']) @fetcher_map[queue_name].(item) end |
#push(*args) ⇒ Object
20 21 22 |
# File 'lib/micro_q/queue/sqs.rb', line 20 def push(*args) async.sync_push(*args) end |
#receive_messages(*items) ⇒ Object
39 40 41 42 43 |
# File 'lib/micro_q/queue/sqs.rb', line 39 def (*items) @lock.synchronize do (@messages += items).flatten! end end |
#sync_push(item, options = {}) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/micro_q/queue/sqs.rb', line 24 def sync_push(item, ={}) item, = MicroQ::Util.stringify(item, ) item['class'] = item['class'].to_s MicroQ.middleware.client.call(item, ) do args, queue_name = [item], verify_queue(item['queue']) if (time = ['when']) args << time.to_f end @fetcher_map[queue_name].(*args) end end |
#verify_queue(name) ⇒ Object
60 61 62 |
# File 'lib/micro_q/queue/sqs.rb', line 60 def verify_queue(name) QUEUES_KEYS.include?(name.to_s) ? name.to_s : 'default' end |