Class: MicroQ::Queue::Default

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/micro_q/queue/default.rb

Overview

The default queue implementation. Handles messages that should be run immediately as well as messages that should be run at some specified time in the future.

When shutting down, if the MicroQ.config.queue_file is defined and accessible, the messages in the queue will be written for persistence.

Usage:

item = { ‘class’ => ‘MyWorker’, ‘args’ => [user.id] }

queue = MicroQ::Queue::Default.new queue.push(item) # asynchronous push (preferred) queue.sync_push(item) # synchronous push

queue.entries #=> [=> ‘MyWorker’, ‘args’ => [32]]

queue.push(item, :when => 15.minutes.from_now)

queue.later

=> 1359703628.38, ‘worker’ => {‘class’ => ‘MyWorker’, ‘args’ => 32}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeDefault

Returns a new instance of Default.



32
33
34
35
36
37
# File 'lib/micro_q/queue/default.rb', line 32

def initialize
  @entries = []
  @later   = []

  load_queues
end

Instance Attribute Details

#entriesObject (readonly)

Returns the value of attribute entries.



30
31
32
# File 'lib/micro_q/queue/default.rb', line 30

def entries
  @entries
end

#laterObject (readonly)

Returns the value of attribute later.



30
31
32
# File 'lib/micro_q/queue/default.rb', line 30

def later
  @later
end

Instance Method Details

#dequeue(limit = 30) ⇒ Object

Remove and return all available messages. Optionally give a limit and return only limit number of messages



74
75
76
77
78
79
80
81
82
# File 'lib/micro_q/queue/default.rb', line 74

def dequeue(limit = 30)
  return [] if limit == 0

  opts = { :i => 0, :limit => limit}
  [].tap do |items|
    dequeue_entries!(items, opts)
    dequeue_later!(items, opts)
  end
end

#push(item, options = {}) ⇒ Object

Asynchronously push a message item to the queue.



42
43
44
# File 'lib/micro_q/queue/default.rb', line 42

def push(item, options={})
  async.sync_push(item, options)
end

#stopObject

Stop the queue and store items for later



87
88
89
90
91
92
93
# File 'lib/micro_q/queue/default.rb', line 87

def stop
  File.open(queue_file, 'w+') do |f|
    f.write(YAML.dump(entries))
  end if queue_file?

  terminate
end

#sync_push(item, options = {}) ⇒ Object

Asynchronously push a message item to the queue. Either push it to the immediate portion of the queue or store it for after when it should be run with the :when option.

Options:

when: The time/timestamp after which to run the message.


54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/micro_q/queue/default.rb', line 54

def sync_push(item, options={})
  item, options = MicroQ::Util.stringify(item, options)
  item['class'] = item['class'].to_s

  MicroQ.middleware.client.call(item, options) do
    if (time = options['when'])
      @later.push(
        'when' => time.to_f,
        'worker' => item
      )
    else
      @entries.push(item)
    end
  end
end