Class: FastlaneCore::QueueWorker
- Inherits:
-
Object
- Object
- FastlaneCore::QueueWorker
- Defined in:
- fastlane_core/lib/fastlane_core/queue_worker.rb
Overview
This dispatches jobs to worker threads and make it work in parallel. It’s suitable for I/O bounds works and not for CPU bounds works. Use this when you have all the items that you’ll process in advance. Simply enqueue them to this and call ‘QueueWorker#start`.
Constant Summary collapse
- NUMBER_OF_THREADS =
FastlaneCore::Helper.test? ? 1 : [ENV["DELIVER_NUMBER_OF_THREADS"], ENV["FL_NUMBER_OF_THREADS"], 10].map(&:to_i).find(&:positive?).clamp(1, ENV.fetch("FL_MAX_NUMBER_OF_THREADS", 10).to_i)
Instance Method Summary collapse
- #batch_enqueue(jobs) ⇒ Object
- #enqueue(job) ⇒ Object
-
#initialize(concurrency = NUMBER_OF_THREADS, &block) ⇒ QueueWorker
constructor
A new instance of QueueWorker.
-
#start ⇒ Object
Call this after you enqueuned all the jobs you want to process This method blocks current thread until all the enqueued jobs are processed.
Constructor Details
#initialize(concurrency = NUMBER_OF_THREADS, &block) ⇒ QueueWorker
Returns a new instance of QueueWorker.
13 14 15 16 17 |
# File 'fastlane_core/lib/fastlane_core/queue_worker.rb', line 13 def initialize(concurrency = NUMBER_OF_THREADS, &block) @concurrency = concurrency @block = block @queue = Queue.new end |
Instance Method Details
#batch_enqueue(jobs) ⇒ Object
25 26 27 28 |
# File 'fastlane_core/lib/fastlane_core/queue_worker.rb', line 25 def batch_enqueue(jobs) raise(ArgumentError, "Enqueue Array instead of #{jobs.class}") unless jobs.kind_of?(Array) jobs.each { |job| enqueue(job) } end |
#enqueue(job) ⇒ Object
20 21 22 |
# File 'fastlane_core/lib/fastlane_core/queue_worker.rb', line 20 def enqueue(job) @queue.push(job) end |
#start ⇒ Object
Call this after you enqueuned all the jobs you want to process This method blocks current thread until all the enqueued jobs are processed
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'fastlane_core/lib/fastlane_core/queue_worker.rb', line 32 def start @queue.close threads = [] @concurrency.times do threads << Thread.new do job = @queue.pop while job @block.call(job) job = @queue.pop end end end threads.each(&:join) end |