Class: ExceptionalSynchrony::LimitedWorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/exceptional_synchrony/limited_work_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(em, limit) ⇒ LimitedWorkQueue

Returns a new instance of LimitedWorkQueue.



4
5
6
7
8
9
10
11
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 4

def initialize(em, limit)
  @em = em
  limit > 0 or raise ArgumentError, "limit must be positive"
  @limit = limit
  @worker_count = 0
  @job_procs = []
  @paused = false
end

Instance Method Details

#add!(proc = nil, &block) ⇒ Object Also known as: add

Adds a job_proc to work.



14
15
16
17
18
19
20
21
22
23
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 14

def add!(proc = nil, &block)
  job = proc || block
  job.respond_to?(:call) or raise "Must respond_to?(:call)! #{job.inspect}"
  if @job_procs.any? && job.respond_to?(:merge) && (merged_queue = job.merge(@job_procs))
    @job_procs = merged_queue
  else
    @job_procs << job
  end
  work! unless paused?
end

#itemsObject



50
51
52
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 50

def items
  @job_procs
end

#pause!Object



42
43
44
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 42

def pause!
  @paused = true
end

#paused?Boolean

Returns:

  • (Boolean)


38
39
40
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 38

def paused?
  @paused
end

#queue_empty?Boolean

Returns:

  • (Boolean)


34
35
36
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 34

def queue_empty?
  @job_procs.empty?
end

#unpause!Object



46
47
48
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 46

def unpause!
  @paused = false
end

#work!Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 54

def work!
  until queue_empty? || workers_full?
    job_proc = @job_procs.shift
    @worker_count += 1
    Fiber.new do
      begin
        job_proc.call
      rescue => ex
        ExceptionHandling.log_error(ex, "LimitedWorkQueue encountered an exception")
      ensure
        worker_done
      end
    end.resume
  end
end

#workers_empty?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 26

def workers_empty?
  @worker_count.zero?
end

#workers_full?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/exceptional_synchrony/limited_work_queue.rb', line 30

def workers_full?
  @worker_count >= @limit
end