Class: Gitlab::BatchPopQueueing
- Inherits:
-
Object
- Object
- Gitlab::BatchPopQueueing
- Defined in:
- lib/gitlab/batch_pop_queueing.rb
Overview
This class is a queuing system for processing expensive tasks in an atomic manner with batch poping to let you optimize the total processing time.
In usual queuing system, the first item started being processed immediately and the following items wait until the next items have been popped from the queue. On the other hand, this queueing system, the former part is same, however, it pops the enqueued items as batch. This is especially useful when you want to drop redandant items from the queue in order to process important items only, thus it's more efficient than the traditional queueing system.
Caveats:
-
The order of the items are not guaranteed because of `sadd` (Redis Sets).
Example: “` class TheWorker
def perform
result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue|
item = extract_the_most_important_item_from(items_in_queue)
expensive_process(item)
end
if result[:status] == :finished && result[:new_items].present?
item = extract_the_most_important_item_from(items_in_queue)
TheWorker.perform_async(item.id)
end
end
end “`
Constant Summary collapse
- EXTRA_QUEUE_EXPIRE_WINDOW =
1.hour
- MAX_COUNTS_OF_POP_ALL =
1000
Instance Attribute Summary collapse
-
#namespace ⇒ Object
readonly
Returns the value of attribute namespace.
-
#queue_id ⇒ Object
readonly
Returns the value of attribute queue_id.
Instance Method Summary collapse
-
#initialize(namespace, queue_id) ⇒ Boolean
constructor
Initialize queue.
-
#safe_execute(new_items, lock_timeout: 10.minutes, &block) ⇒ Hash
Execute the given block in an exclusive lock.
Constructor Details
#initialize(namespace, queue_id) ⇒ Boolean
Initialize queue
46 47 48 49 50 |
# File 'lib/gitlab/batch_pop_queueing.rb', line 46 def initialize(namespace, queue_id) raise ArgumentError if namespace.empty? || queue_id.empty? @namespace, @queue_id = namespace, queue_id end |
Instance Attribute Details
#namespace ⇒ Object (readonly)
Returns the value of attribute namespace
36 37 38 |
# File 'lib/gitlab/batch_pop_queueing.rb', line 36 def namespace @namespace end |
#queue_id ⇒ Object (readonly)
Returns the value of attribute queue_id
36 37 38 |
# File 'lib/gitlab/batch_pop_queueing.rb', line 36 def queue_id @queue_id end |
Instance Method Details
#safe_execute(new_items, lock_timeout: 10.minutes, &block) ⇒ Hash
Execute the given block in an exclusive lock. If there is the other thread has already working on the block, it enqueues the items without processing the block.
NOTE: If an exception is raised in the block, the poppped items will not be recovered.
We should NOT re-enqueue the items in this case because it could end up in an infinite loop.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/gitlab/batch_pop_queueing.rb', line 65 def safe_execute(new_items, lock_timeout: 10.minutes, &block) enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW) lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout) return { status: :enqueued } unless uuid = lease.try_obtain begin all_args = pop_all yield all_args if block_given? { status: :finished, new_items: peek_all } ensure Gitlab::ExclusiveLease.cancel(lock_key, uuid) end end |