Class: Kronk::QueueRunner
- Inherits:
-
Object
- Object
- Kronk::QueueRunner
- Defined in:
- lib/kronk/queue_runner.rb
Overview
A basic queue and input processor that runs multi-threaded.
Input is optional and specified by creating an input trigger (passing a block to on(:input)). Input will be used to fill queue as the queue gets depleted by being processed.
qrunner = QueueRunner.new
qrunner.concurrency = 20 # thread count
qrunner.number = 100 # process 100 queue items
file = File.open "example.log", "r"
qrunner.on :input do
if file.eof?
qrunner.finish
else
file.readline
end
end
qrunner.on :complete do
file.close
puts "DONE!"
end
# If running in multi-threaded mode, item mutex will also be passed
# as optional second argument.
qrunner.run do |queue_item|
# Do something with item.
end
Additionally, the :interrupt trigger may be used to handle behavior when SIGINT is sent to the process.
qrunner.on :interrupt do
qrunner.kill
puts "Caught SIGINT"
exit 1
end
The :result trigger may also be used to perform actions with the return value of the block given to QueueRunner#run. This is useful for post-processing data without affecting concurrency as it will be run in a separate thread.
qrunner.on :result do |result|
p result
end
Direct Known Subclasses
Instance Attribute Summary collapse
-
#count ⇒ Object
Returns the value of attribute count.
-
#number ⇒ Object
Returns the value of attribute number.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#reader_thread ⇒ Object
Returns the value of attribute reader_thread.
-
#threads ⇒ Object
Returns the value of attribute threads.
Instance Method Summary collapse
-
#concurrently(concurrency = 1, &block) ⇒ Object
Process the queue and read from IO if available.
-
#finish ⇒ Object
Stop runner processing gracefully.
-
#finished? ⇒ Boolean
Returns true if processing queue should be stopped, otherwise false.
-
#initialize(opts = {}) ⇒ QueueRunner
constructor
Create a new QueueRunner for batch multi-threaded processing.
-
#kill ⇒ Object
Immediately end all runner processing and threads.
-
#on(trigger_name, &block) ⇒ Object
Specify a block to run for a given trigger name.
-
#periodically(period = 0.01, clump = 1, &block) ⇒ Object
Process the queue with periodic timer and a given period in seconds.
-
#start_input!(max_queue = @max_queue_size) ⇒ Object
Attempt to fill the queue by reading from the IO instance.
-
#stop_input! ⇒ Object
Permanently stop input reading by killing the reader thread for a given QueueRunner#run or QueueRunner#process_queue session.
-
#trigger(name, *args) ⇒ Object
Run a previously defined callback.
-
#until_finished ⇒ Object
Loop and read from input continually until finished.
-
#yield_queue_item ⇒ Object
Shifts one item off the queue and yields it to the given block.
Constructor Details
#initialize(opts = {}) ⇒ QueueRunner
Create a new QueueRunner for batch multi-threaded processing. Supported options are:
- :number
-
Fixnum - Total number of items to process
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/kronk/queue_runner.rb', line 63 def initialize opts={} @number = opts[:number] @count = 0 @queue = [] @threads = [] @rthreads = [] @max_queue_size = 100 @reader_thread = nil @triggers = {} @qmutex = Mutex.new end |
Instance Attribute Details
#count ⇒ Object
Returns the value of attribute count.
56 57 58 |
# File 'lib/kronk/queue_runner.rb', line 56 def count @count end |
#number ⇒ Object
Returns the value of attribute number.
56 57 58 |
# File 'lib/kronk/queue_runner.rb', line 56 def number @number end |
#queue ⇒ Object
Returns the value of attribute queue.
56 57 58 |
# File 'lib/kronk/queue_runner.rb', line 56 def queue @queue end |
#reader_thread ⇒ Object
Returns the value of attribute reader_thread.
56 57 58 |
# File 'lib/kronk/queue_runner.rb', line 56 def reader_thread @reader_thread end |
#threads ⇒ Object
Returns the value of attribute threads.
56 57 58 |
# File 'lib/kronk/queue_runner.rb', line 56 def threads @threads end |
Instance Method Details
#concurrently(concurrency = 1, &block) ⇒ Object
Process the queue and read from IO if available.
Yields queue item until queue and io (if available) are empty and the totaly number of requests to run is met (if number is set).
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/kronk/queue_runner.rb', line 139 def concurrently concurrency=1, &block @max_queue_size = concurrency * 2 until_finished do |count, active_count| if active_count >= concurrency || @queue.empty? Thread.pass next end num_threads = smaller_count(concurrency - active_count) num_threads.times do yield_queue_item(&block) end end end |
#finish ⇒ Object
Stop runner processing gracefully.
94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/kronk/queue_runner.rb', line 94 def finish stop_input! @threads.each do |t| @rthreads << Thread.new(t.value){|value| trigger :result, value } end @rthreads.each(&:join) @threads.clear @rthreads.clear end |
#finished? ⇒ Boolean
Returns true if processing queue should be stopped, otherwise false.
83 84 85 86 87 88 |
# File 'lib/kronk/queue_runner.rb', line 83 def finished? return true if @number && @count >= @number @queue.empty? && @count > 0 && (!@reader_thread || !@reader_thread.alive?) end |
#kill ⇒ Object
Immediately end all runner processing and threads.
111 112 113 114 115 116 117 |
# File 'lib/kronk/queue_runner.rb', line 111 def kill stop_input! @threads.each{|t| t.kill} @rthreads.each{|t| t.kill} @threads.clear @rthreads.clear end |
#on(trigger_name, &block) ⇒ Object
Specify a block to run for a given trigger name. Supported triggers are:
- :complete
-
Called after queue and input have been fully processed.
- :input
-
Called every time the queue needs populating.
- :interrupt
-
Called when SIGINT is captured.
- :start
-
Called before queue starts being processed.
128 129 130 |
# File 'lib/kronk/queue_runner.rb', line 128 def on trigger_name, &block @triggers[trigger_name] = block end |
#periodically(period = 0.01, clump = 1, &block) ⇒ Object
Process the queue with periodic timer and a given period in seconds.
Yields queue item until queue and io (if available) are empty and the totaly number of requests to run is met (if number is set).
If clump is given, will fire clump number of requests at a time, which can modify how often requests are send. The overall period should however be close to the one specified.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/kronk/queue_runner.rb', line 167 def periodically period=0.01, clump=1, &block @max_queue_size = 0.5 / period @max_queue_size = clump * 2 if @max_queue_size < (clump * 2) start = Time.now until_finished do |count, active_count| num_threads = 1 expected_count = ((Time.now - start) / period).ceil if count < expected_count num_threads = smaller_count(expected_count - count) else sleep_time = period * smaller_count(clump) sleep sleep_time end num_threads.times do yield_queue_item(&block) end end end |
#start_input!(max_queue = @max_queue_size) ⇒ Object
Attempt to fill the queue by reading from the IO instance. Starts a new thread and returns the thread instance.
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/kronk/queue_runner.rb', line 258 def start_input! max_queue=@max_queue_size return unless @triggers[:input] @reader_thread = Thread.new do begin loop do if max_queue && @queue.length >= max_queue Thread.pass next end while !max_queue || @queue.length < max_queue item = trigger(:input) @qmutex.synchronize{ @queue << item } end Thread.pass end rescue => e Thread.main.raise e end end end |
#stop_input! ⇒ Object
Permanently stop input reading by killing the reader thread for a given QueueRunner#run or QueueRunner#process_queue session.
287 288 289 290 |
# File 'lib/kronk/queue_runner.rb', line 287 def stop_input! Thread.pass @reader_thread && @reader_thread.kill end |
#trigger(name, *args) ⇒ Object
Run a previously defined callback. See QueueRunner#on.
296 297 298 299 |
# File 'lib/kronk/queue_runner.rb', line 296 def trigger name, *args t = @triggers[name] t && t.call(*args) end |
#until_finished ⇒ Object
Loop and read from input continually until finished. Yields total_count and active_count if passed a block.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/kronk/queue_runner.rb', line 196 def until_finished old_trap = trap 'INT' do @stop_time = Time.now kill trigger(:interrupt) trap 'INT', old_trap Process.kill 'INT', Process.pid end @start_time = Time.now trigger :start start_input! @count = 0 until finished? @rthreads.delete_if{|t| !t.alive? && t.join } results = [] @threads.delete_if do |t| !t.alive? && results << t.value end @rthreads << Thread.new(results) do |values| values.each{|value| trigger :result, value } end unless results.empty? yield @count, @threads.count if block_given? end @stop_time = Time.now finish trigger :complete end |
#yield_queue_item ⇒ Object
Shifts one item off the queue and yields it to the given block.
239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/kronk/queue_runner.rb', line 239 def yield_queue_item until item = @qmutex.synchronize{ @queue.shift } or !@reader_thread.alive? Thread.pass end return unless item @threads << Thread.new(item) do |q_item| yield q_item if block_given? end @count += 1 end |