Class: MicroQ::Queue::Redis
- Inherits:
-
Object
- Object
- MicroQ::Queue::Redis
- Includes:
- Celluloid
- Defined in:
- lib/micro_q/queue/redis.rb
Constant Summary collapse
- QUEUES =
{ :entries => 'micro_q:queue:entries', :later => 'micro_q:queue:later' }.freeze
Instance Method Summary collapse
-
#dequeue(limit = 30) ⇒ Object
Remove and return up to :limit number of items from the entries queue.
-
#entries ⇒ Object
All of the items in the entries queue.
-
#later ⇒ Object
All of the items in the later queue.
-
#push(item, options = {}) ⇒ Object
Asynchronously push a message item to the queue.
- #sync_push(item, options = {}) ⇒ Object
Instance Method Details
#dequeue(limit = 30) ⇒ Object
Remove and return up to :limit number of items from the entries queue. Move any available items from the later queue into the entries for future dequeue.
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/micro_q/queue/redis.rb', line 64 def dequeue(limit = 30) idx, items = 0, [] fetch(limit).tap do |(e, later)| e.each {|item| items << item unless (idx += 1) > limit } MicroQ.redis do |r| ((e - items) + later).each {|l| r.rpush(QUEUES[:entries], l)} end end items.collect(&MicroQ::Util.json_parse) end |
#entries ⇒ Object
All of the items in the entries queue
14 15 16 17 18 |
# File 'lib/micro_q/queue/redis.rb', line 14 def entries MicroQ.redis do |r| r.lrange(QUEUES[:entries], 0, -1) end.collect(&MicroQ::Util.json_parse) end |
#later ⇒ Object
All of the items in the later queue
23 24 25 26 27 |
# File 'lib/micro_q/queue/redis.rb', line 23 def later MicroQ.redis do |r| r.zrangebyscore(QUEUES[:later], '-inf', '+inf') end.collect(&MicroQ::Util.json_parse) end |
#push(item, options = {}) ⇒ Object
Asynchronously push a message item to the queue. Either push it to the immediate portion of the queue (a Redis list) or store it in the later queue (a Redis sorted-set). The message will be available for dequeue after the :when time passes.
Options:
when: The time/timestamp after which to run the message.
38 39 40 |
# File 'lib/micro_q/queue/redis.rb', line 38 def push(item, = {}) async.sync_push(item, ) end |
#sync_push(item, options = {}) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/micro_q/queue/redis.rb', line 42 def sync_push(item, = {}) item, = MicroQ::Util.stringify(item, ) item['class'] = item['class'].to_s MicroQ.middleware.client.call(item, ) do json = JSON.dump(item) MicroQ.redis do |r| if (time = ['when']) r.zadd(QUEUES[:later], time.to_f, json) else r.lpush(QUEUES[:entries], json) end end end end |