Class: ExceptionalSynchrony::LimitedWorkQueue
- Inherits:
-
Object
- Object
- ExceptionalSynchrony::LimitedWorkQueue
- Defined in:
- lib/exceptional_synchrony/limited_work_queue.rb
Instance Method Summary collapse
-
#add!(proc = nil, &block) ⇒ Object
(also: #add)
Adds a job_proc to work.
-
#initialize(em, limit) ⇒ LimitedWorkQueue
constructor
A new instance of LimitedWorkQueue.
- #items ⇒ Object
- #pause! ⇒ Object
- #paused? ⇒ Boolean
- #queue_empty? ⇒ Boolean
- #unpause! ⇒ Object
- #work! ⇒ Object
- #workers_empty? ⇒ Boolean
- #workers_full? ⇒ Boolean
Constructor Details
#initialize(em, limit) ⇒ LimitedWorkQueue
Returns a new instance of LimitedWorkQueue.
4 5 6 7 8 9 10 11 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 4 def initialize(em, limit) @em = em limit > 0 or raise ArgumentError, "limit must be positive" @limit = limit @worker_count = 0 @job_procs = [] @paused = false end |
Instance Method Details
#add!(proc = nil, &block) ⇒ Object Also known as: add
Adds a job_proc to work.
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 14 def add!(proc = nil, &block) job = proc || block job.respond_to?(:call) or raise "Must respond_to?(:call)! #{job.inspect}" if @job_procs.any? && job.respond_to?(:merge) && (merged_queue = job.merge(@job_procs)) @job_procs = merged_queue else @job_procs << job end work! unless paused? end |
#items ⇒ Object
50 51 52 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 50 def items @job_procs end |
#pause! ⇒ Object
42 43 44 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 42 def pause! @paused = true end |
#paused? ⇒ Boolean
38 39 40 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 38 def paused? @paused end |
#queue_empty? ⇒ Boolean
34 35 36 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 34 def queue_empty? @job_procs.empty? end |
#unpause! ⇒ Object
46 47 48 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 46 def unpause! @paused = false end |
#work! ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 54 def work! until queue_empty? || workers_full? job_proc = @job_procs.shift @worker_count += 1 Fiber.new do begin job_proc.call rescue => ex ExceptionHandling.log_error(ex, "LimitedWorkQueue encountered an exception") ensure worker_done end end.resume end end |
#workers_empty? ⇒ Boolean
26 27 28 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 26 def workers_empty? @worker_count.zero? end |
#workers_full? ⇒ Boolean
30 31 32 |
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 30 def workers_full? @worker_count >= @limit end |