Class: MicroQ::Queue::Redis

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/micro_q/queue/redis.rb

Constant Summary collapse

QUEUES =
{
  :entries => 'micro_q:queue:entries',
  :later => 'micro_q:queue:later'
}.freeze

Instance Method Summary collapse

Instance Method Details

#dequeue(limit = 30) ⇒ Object

Remove and return up to :limit number of items from the entries queue. Move any available items from the later queue into the entries for future dequeue.



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/micro_q/queue/redis.rb', line 64

def dequeue(limit = 30)
  idx, items = 0, []

  fetch(limit).tap do |(e, later)|
    e.each {|item| items << item unless (idx += 1) > limit }

    MicroQ.redis do |r|
      ((e - items) + later).each {|l| r.rpush(QUEUES[:entries], l)}
    end
  end

  items.collect(&MicroQ::Util.json_parse)
end

#entriesObject

All of the items in the entries queue



14
15
16
17
18
# File 'lib/micro_q/queue/redis.rb', line 14

def entries
  MicroQ.redis do |r|
    r.lrange(QUEUES[:entries], 0, -1)
  end.collect(&MicroQ::Util.json_parse)
end

#laterObject

All of the items in the later queue



23
24
25
26
27
# File 'lib/micro_q/queue/redis.rb', line 23

def later
  MicroQ.redis do |r|
    r.zrangebyscore(QUEUES[:later], '-inf', '+inf')
  end.collect(&MicroQ::Util.json_parse)
end

#push(item, options = {}) ⇒ Object

Asynchronously push a message item to the queue. Either push it to the immediate portion of the queue (a Redis list) or store it in the later queue (a Redis sorted-set). The message will be available for dequeue after the :when time passes.

Options:

when: The time/timestamp after which to run the message.


38
39
40
# File 'lib/micro_q/queue/redis.rb', line 38

def push(item, options = {})
  async.sync_push(item, options)
end

#sync_push(item, options = {}) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/micro_q/queue/redis.rb', line 42

def sync_push(item, options = {})
  item, options = MicroQ::Util.stringify(item, options)
  item['class'] = item['class'].to_s

  MicroQ.middleware.client.call(item, options) do
    json = JSON.dump(item)

    MicroQ.redis do |r|
      if (time = options['when'])
        r.zadd(QUEUES[:later], time.to_f, json)
      else
        r.lpush(QUEUES[:entries], json)
      end
    end
  end
end