Class: MicroQ::Queue::Default
- Inherits:
-
Object
- Object
- MicroQ::Queue::Default
- Includes:
- Celluloid
- Defined in:
- lib/micro_q/queue/default.rb
Overview
The default queue implementation. Handles messages that should be run immediately as well as messages that should be run at some specified time in the future.
When shutting down, if the MicroQ.config.queue_file is defined and accessible, the messages in the queue will be written for persistence.
Usage:
item = { ‘class’ => ‘MyWorker’, ‘args’ => [user.id] }
queue = MicroQ::Queue::Default.new queue.push(item) # asynchronous push (preferred) queue.sync_push(item) # synchronous push
queue.entries #=> [=> ‘MyWorker’, ‘args’ => [32]]
queue.push(item, :when => 15.minutes.from_now)
queue.later
- => 1359703628.38, ‘worker’ => {‘class’ => ‘MyWorker’, ‘args’ => 32}
Instance Attribute Summary collapse
-
#entries ⇒ Object
readonly
Returns the value of attribute entries.
-
#later ⇒ Object
readonly
Returns the value of attribute later.
Instance Method Summary collapse
-
#dequeue(limit = 30) ⇒ Object
Remove and return all available messages.
-
#initialize ⇒ Default
constructor
A new instance of Default.
-
#push(item, options = {}) ⇒ Object
Asynchronously push a message item to the queue.
-
#stop ⇒ Object
Stop the queue and store items for later.
-
#sync_push(item, options = {}) ⇒ Object
Asynchronously push a message item to the queue.
Constructor Details
#initialize ⇒ Default
Returns a new instance of Default.
32 33 34 35 36 37 |
# File 'lib/micro_q/queue/default.rb', line 32 def initialize @entries = [] @later = [] load_queues end |
Instance Attribute Details
#entries ⇒ Object (readonly)
Returns the value of attribute entries.
30 31 32 |
# File 'lib/micro_q/queue/default.rb', line 30 def entries @entries end |
#later ⇒ Object (readonly)
Returns the value of attribute later.
30 31 32 |
# File 'lib/micro_q/queue/default.rb', line 30 def later @later end |
Instance Method Details
#dequeue(limit = 30) ⇒ Object
Remove and return all available messages. Optionally give a limit and return only limit number of messages
74 75 76 77 78 79 80 81 82 |
# File 'lib/micro_q/queue/default.rb', line 74 def dequeue(limit = 30) return [] if limit == 0 opts = { :i => 0, :limit => limit} [].tap do |items| dequeue_entries!(items, opts) dequeue_later!(items, opts) end end |
#push(item, options = {}) ⇒ Object
Asynchronously push a message item to the queue.
42 43 44 |
# File 'lib/micro_q/queue/default.rb', line 42 def push(item, ={}) async.sync_push(item, ) end |
#stop ⇒ Object
Stop the queue and store items for later
87 88 89 90 91 92 93 |
# File 'lib/micro_q/queue/default.rb', line 87 def stop File.open(queue_file, 'w+') do |f| f.write(YAML.dump(entries)) end if queue_file? terminate end |
#sync_push(item, options = {}) ⇒ Object
Asynchronously push a message item to the queue. Either push it to the immediate portion of the queue or store it for after when it should be run with the :when option.
Options:
when: The time/timestamp after which to run the message.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/micro_q/queue/default.rb', line 54 def sync_push(item, ={}) item, = MicroQ::Util.stringify(item, ) item['class'] = item['class'].to_s MicroQ.middleware.client.call(item, ) do if (time = ['when']) @later.push( 'when' => time.to_f, 'worker' => item ) else @entries.push(item) end end end |