Module: BackgroundQueue::ServerLib::QueueRegistry
- Included in:
- BalancedQueue, Owner
- Defined in:
- lib/background_queue/server_lib/queue_registry.rb
Instance Method Summary collapse
- #add_item(item) ⇒ Object
- #finish_item(item) ⇒ Object
- #next_item ⇒ Object
- #remove_item(item) ⇒ Object
Instance Method Details
#add_item(item) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 |
# File 'lib/background_queue/server_lib/queue_registry.rb', line 4 def add_item(item) in_queue, queue = get_queue(get_queue_id_from_item(item), true) priority_increased, original_priority = track_priority_when_adding_to_queue(queue, item) if !queue.stalled? && (!in_queue || priority_increased) if in_queue #remove from existing priority queue remove(queue, original_priority) end push(queue) elsif queue.stalled? && !(queue.synchronous? && queue.has_running_items?) #it stalled because it was empty... resume_queue(queue) end end |
#finish_item(item) ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/background_queue/server_lib/queue_registry.rb', line 52 def finish_item(item) #puts "#{self.class.name}:finish item: #{item.inspect}" in_queue, queue = get_queue(get_queue_id_from_item(item), false) raise "Queue #{get_queue_id_from_item(item)} unavailble when finishing item" if queue.nil? queue.finish_item(item) @running_items -= 1 resume_queue(queue) unless queue.synchronous? && queue.has_running_items? end |
#next_item ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/background_queue/server_lib/queue_registry.rb', line 17 def next_item item = nil queue = pop unless queue.nil? priority_decreased, original_priority, item = remove_item_from_queue(queue, :next) # some items must run synchronously, so we dont want to add it back until the task is finished. # if the queue it empty we still want to keep it there until the task is finished, incase the running task queues more tasks against the job. if queue.empty? || queue.synchronous? @items.delete(queue.id) stall_queue(queue) else push(queue) end @running_items += 1 item.running = true unless item.nil? end #server.logger.debug("next item #{item.nil? ? 'nil' : item.id}") item end |
#remove_item(item) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/background_queue/server_lib/queue_registry.rb', line 38 def remove_item(item) item.running = false in_queue, queue = get_queue(get_queue_id_from_item(item), false) raise "Unable to remove task #{item.id} at priority #{item.priority} (no queue at that priority)" if queue.nil? priority_decreased, original_priority, item = remove_item_from_queue(queue, item) if queue.empty? remove(queue, original_priority) @items.delete(queue.id) elsif priority_decreased remove(queue, original_priority) push(queue) end end |