Class: Kronk::QueueRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/kronk/queue_runner.rb

Overview

A basic queue and input processor that runs multi-threaded.

Input is optional and specified by creating an input trigger (passing a block to on(:input)). Input will be used to fill queue as the queue gets depleted by being processed.

qrunner = QueueRunner.new
qrunner.concurrency = 20  # thread count
qrunner.number = 100      # process 100 queue items

file = File.open "example.log", "r"

qrunner.on :input do
  if file.eof?
    qrunner.finish
  else
    file.readline
  end
end

qrunner.on :complete do
  file.close
  puts "DONE!"
end

# If running in multi-threaded mode, item mutex will also be passed
# as optional second argument.
qrunner.run do |queue_item|
  # Do something with item.
end

Additionally, the :interrupt trigger may be used to handle behavior when SIGINT is sent to the process.

qrunner.on :interrupt do
  qrunner.kill
  puts "Caught SIGINT"
  exit 1
end

The :result trigger may also be used to perform actions with the return value of the block given to QueueRunner#run. This is useful for post-processing data without affecting concurrency as it will be run in a separate thread.

qrunner.on :result do |result|
  p result
end

Direct Known Subclasses

Player

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ QueueRunner

Create a new QueueRunner for batch multi-threaded processing. Supported options are:

:number

Fixnum - Total number of items to process



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/kronk/queue_runner.rb', line 63

def initialize opts={}
  @number   = opts[:number]
  @count    = 0
  @queue    = []
  @threads  = []
  @rthreads = []

  @max_queue_size = 100

  @reader_thread = nil

  @triggers = {}

  @qmutex = Mutex.new
end

Instance Attribute Details

#countObject

Returns the value of attribute count.



56
57
58
# File 'lib/kronk/queue_runner.rb', line 56

def count
  @count
end

#numberObject

Returns the value of attribute number.



56
57
58
# File 'lib/kronk/queue_runner.rb', line 56

def number
  @number
end

#queueObject

Returns the value of attribute queue.



56
57
58
# File 'lib/kronk/queue_runner.rb', line 56

def queue
  @queue
end

#reader_threadObject

Returns the value of attribute reader_thread.



56
57
58
# File 'lib/kronk/queue_runner.rb', line 56

def reader_thread
  @reader_thread
end

#threadsObject

Returns the value of attribute threads.



56
57
58
# File 'lib/kronk/queue_runner.rb', line 56

def threads
  @threads
end

Instance Method Details

#concurrently(concurrency = 1, &block) ⇒ Object

Process the queue and read from IO if available.

Yields queue item until queue and io (if available) are empty and the totaly number of requests to run is met (if number is set).



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/kronk/queue_runner.rb', line 139

def concurrently concurrency=1, &block
  @max_queue_size = concurrency * 2

  until_finished do |count, active_count|
    if active_count >= concurrency || @queue.empty?
      Thread.pass
      next
    end

    num_threads = smaller_count(concurrency - active_count)

    num_threads.times do
      yield_queue_item(&block)
    end
  end
end

#finishObject

Stop runner processing gracefully.



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/kronk/queue_runner.rb', line 94

def finish
  stop_input!

  @threads.each do |t|
    @rthreads << Thread.new(t.value){|value| trigger :result, value }
  end

  @rthreads.each(&:join)

  @threads.clear
  @rthreads.clear
end

#finished?Boolean

Returns true if processing queue should be stopped, otherwise false.

Returns:

  • (Boolean)


83
84
85
86
87
88
# File 'lib/kronk/queue_runner.rb', line 83

def finished?
  return true if @number && @count >= @number

  @queue.empty? && @count > 0 &&
    (!@reader_thread || !@reader_thread.alive?)
end

#killObject

Immediately end all runner processing and threads.



111
112
113
114
115
116
117
# File 'lib/kronk/queue_runner.rb', line 111

def kill
  stop_input!
  @threads.each{|t| t.kill}
  @rthreads.each{|t| t.kill}
  @threads.clear
  @rthreads.clear
end

#on(trigger_name, &block) ⇒ Object

Specify a block to run for a given trigger name. Supported triggers are:

:complete

Called after queue and input have been fully processed.

:input

Called every time the queue needs populating.

:interrupt

Called when SIGINT is captured.

:start

Called before queue starts being processed.



128
129
130
# File 'lib/kronk/queue_runner.rb', line 128

def on trigger_name, &block
  @triggers[trigger_name] = block
end

#periodically(period = 0.01, clump = 1, &block) ⇒ Object

Process the queue with periodic timer and a given period in seconds.

Yields queue item until queue and io (if available) are empty and the totaly number of requests to run is met (if number is set).

If clump is given, will fire clump number of requests at a time, which can modify how often requests are send. The overall period should however be close to the one specified.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/kronk/queue_runner.rb', line 167

def periodically period=0.01, clump=1, &block
  @max_queue_size = 0.5 / period
  @max_queue_size = clump * 2 if @max_queue_size < (clump * 2)

  start = Time.now

  until_finished do |count, active_count|
    num_threads    = 1
    expected_count = ((Time.now - start) / period).ceil

    if count < expected_count
      num_threads = smaller_count(expected_count - count)
    else
      sleep_time = period * smaller_count(clump)
      sleep sleep_time
    end

    num_threads.times do
      yield_queue_item(&block)
    end
  end
end

#start_input!(max_queue = @max_queue_size) ⇒ Object

Attempt to fill the queue by reading from the IO instance. Starts a new thread and returns the thread instance.



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/kronk/queue_runner.rb', line 258

def start_input! max_queue=@max_queue_size
  return unless @triggers[:input]

  @reader_thread = Thread.new do
    begin
      loop do
        if max_queue && @queue.length >= max_queue
          Thread.pass
          next
        end

        while !max_queue || @queue.length < max_queue
          item = trigger(:input)
          @qmutex.synchronize{ @queue << item }
        end
        Thread.pass
      end

    rescue => e
      Thread.main.raise e
    end
  end
end

#stop_input!Object

Permanently stop input reading by killing the reader thread for a given QueueRunner#run or QueueRunner#process_queue session.



287
288
289
290
# File 'lib/kronk/queue_runner.rb', line 287

def stop_input!
  Thread.pass
  @reader_thread && @reader_thread.kill
end

#trigger(name, *args) ⇒ Object

Run a previously defined callback. See QueueRunner#on.



296
297
298
299
# File 'lib/kronk/queue_runner.rb', line 296

def trigger name, *args
  t = @triggers[name]
  t && t.call(*args)
end

#until_finishedObject

Loop and read from input continually until finished. Yields total_count and active_count if passed a block.



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/kronk/queue_runner.rb', line 196

def until_finished
  old_trap = trap 'INT' do
    @stop_time = Time.now
    kill
    trigger(:interrupt)
    trap 'INT', old_trap
    Process.kill 'INT', Process.pid
  end

  @start_time = Time.now

  trigger :start

  start_input!
  @count = 0

  until finished?
    @rthreads.delete_if{|t| !t.alive? && t.join }

    results = []
    @threads.delete_if do |t|
      !t.alive? &&
        results << t.value
    end

    @rthreads << Thread.new(results) do |values|
      values.each{|value| trigger :result, value }
    end unless results.empty?

    yield @count, @threads.count if block_given?
  end

  @stop_time = Time.now

  finish

  trigger :complete
end

#yield_queue_itemObject

Shifts one item off the queue and yields it to the given block.



239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/kronk/queue_runner.rb', line 239

def yield_queue_item
  until item = @qmutex.synchronize{ @queue.shift } or !@reader_thread.alive?
    Thread.pass
  end

  return unless item

  @threads << Thread.new(item) do |q_item|
    yield q_item if block_given?
  end

  @count += 1
end