Class: CLI::UI::WorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/cli/ui/work_queue.rb

Defined Under Namespace

Classes: Future

Instance Method Summary collapse

Constructor Details

#initialize(max_concurrent) ⇒ WorkQueue

: (Integer max_concurrent) -> void



68
69
70
71
72
73
74
# File 'lib/cli/ui/work_queue.rb', line 68

def initialize(max_concurrent)
  @max_concurrent = max_concurrent
  @queue = Queue.new #: Queue
  @mutex = Mutex.new #: Mutex
  @condition = ConditionVariable.new #: ConditionVariable
  @workers = [] #: Array[Thread]
end

Instance Method Details

#closeObject

: -> void



87
88
89
# File 'lib/cli/ui/work_queue.rb', line 87

def close
  @queue.close
end

#enqueue(&block) ⇒ Object

: { -> untyped } -> Future



77
78
79
80
81
82
83
84
# File 'lib/cli/ui/work_queue.rb', line 77

def enqueue(&block)
  future = Future.new
  @mutex.synchronize do
    start_worker if @workers.size < @max_concurrent
  end
  @queue.push([future, block])
  future
end

#interruptObject

: -> void



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/cli/ui/work_queue.rb', line 98

def interrupt
  @mutex.synchronize do
    @queue.close
    # Fail any remaining tasks in the queue
    until @queue.empty?
      future, _block = @queue.pop(true)
      future&.fail(Interrupt.new)
    end
    # Interrupt all worker threads
    @workers.each { |worker| worker.raise(Interrupt) if worker.alive? }
    @workers.each(&:join)
    @workers.clear
  end
end

#waitObject

: -> void



92
93
94
95
# File 'lib/cli/ui/work_queue.rb', line 92

def wait
  @queue.close
  @workers.each(&:join)
end