Module: TaskBatcher

Extended by:
TaskBatcher
Included in:
TaskBatcher
Defined in:
lib/task_batcher.rb

Defined Under Namespace

Classes: Error

Instance Method Summary collapse

Instance Method Details

#batch_duration(taskname) ⇒ Fixnum

Return the batch duration for a particular task name.

Returns:

  • (Fixnum)

    number of seconds to delay before running this batch



140
141
142
# File 'lib/task_batcher.rb', line 140

def batch_duration(taskname)
  durations[taskname] || default_batch_duration
end

#default_batch_durationFixnum

Return the default batch duration in seconds. (This will be the value of METADEFAULT_DURATION, unless you have called #set_default_batch_duration.)

Returns:

  • (Fixnum)

    the default batch duration in seconds



125
126
127
# File 'lib/task_batcher.rb', line 125

def default_batch_duration
  @default_batch_duration || METADEFAULT_DURATION
end

#process_batch(taskname) ⇒ Object

Generally, the #process_batch method will be called by Event Machine when the batch duration expires. However, it is public because the caller may want to explicitly process a batch.

Parameters:

  • taskname (String, Symbol)

    the name of the batch to process



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/task_batcher.rb', line 196

def process_batch(taskname)
  this_batch = nil

  # Grab the batch.  If another TaskBatcher call is added immediately after
  # this synchronize step, then it just starts a new batch.
  mutex.synchronize do
    this_batch = batches[taskname]
    batches[taskname] = nil
  end
  return nil if this_batch.nil?

  # Grab the block, list of batch args, and callback.  Then run the batch!
  batch_processor = this_batch[:block]
  args_hashes = this_batch[:args_hashes]
  callback = this_batch[:callback] || lambda {|arg_hashes| nil}
  begin
    results = batch_processor[args_hashes]
  rescue Exception => e
    raise Error, "Exception raised during TaskBatcher processing:\n" +
      "#{e.backtrace.join("\n   ")}"
  end

  # Run the callback.  The callback's return value isn't accessible in any
  # meaningful way.
  callback[results]
end

#resetObject

Reset all state; forget everything about batches in progress, tasknames, customized durations, etc.



101
102
103
104
105
106
107
108
109
110
# File 'lib/task_batcher.rb', line 101

def reset
  @default_batch_duration = @batch_durations = @batches = @mutex = nil

  # Other system components might be using EM.  If an EM reactor is running,
  # do nothing.  If not, start one on a thread that we'll own.
  if not EM.reactor_running?
    @em_thread = Thread.new { EM.run }
    while not EM.reactor_running?; sleep 0.01; end
  end
end

#set_batch_duration(taskname, seconds) ⇒ Object

Set the batch duration for a particular task name.

Parameters:

  • taskname (String, Symbol)

    taskname for which we set the batch duration

  • seconds (Fixnum)

    number of seconds to delay before running the batch



133
134
135
# File 'lib/task_batcher.rb', line 133

def set_batch_duration(taskname, seconds)
  durations[taskname] = seconds
end

#set_default_batch_duration(seconds) ⇒ Object

With no guidance from the client, the default batch duration is the value of METADEFAULT_DURATION. However, in addition to being able to set the batch duration of any taskname, clients can set the global default batch duration.

Parameters:

  • seconds (Fixnum)

    default batch duration in seconds



117
118
119
# File 'lib/task_batcher.rb', line 117

def set_default_batch_duration(seconds)
  @default_batch_duration = seconds
end

#task(taskname, task_args_hash, callback = nil, &block) ⇒ Object

Add a task to a batch, or start a new batch if none exists with this taskname.

Once the batch duration expires, all tasks in the batch will be run, i.e. the block will be passed a list of task_args_hash values and the block is expected to process that entire list.

Usage: TaskBatcher.task(‘taskname’, ‘foo’, var2: ‘bar’) do |args_hashes|

var1s = args_hashes.map { |h| h[:var1] }
var2s = args_hashes.map { |h| h[:var2] }
print "This batcher was invoked with #{args_hashes.count} tasks.\n"
print "Hopefully the tasks can be batched efficiently.\n"
print "Then it would be faster to invoke 'func1(var1s); func2(var2s)' \n"
print "instead of 'args_hashes.each { |h| func12(h[:var1], h[:var2]) }'\n"

end

Only the batch’s FIRST callback and block are respected!

# assuming the batch is empty here...

TaskBatcher.task('mytask', args1, callback1) { func1 }
TaskBatcher.task('mytask', args2, callback2) { func2 }
TaskBatcher.task('mytask', args3, callback3) { func3 }

# ... then this batch will invoke func1([args1, args2, args3]) and invoke
# callback1 on the result.  func2/func3 and callback2/callback3 will
# never be used!

Raises:



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/task_batcher.rb', line 172

def task(taskname, task_args_hash, callback=nil, &block)
  raise Error, 'taskname required' if taskname.nil?
  mutex.synchronize do
    this_batch = batches[taskname]
    if this_batch.nil?
      batches[taskname] = {
        block:       block,
        callback:    callback,
        args_hashes: [ task_args_hash ],
      }
      EM.add_timer(batch_duration(taskname)) do
        EM.defer { process_batch(taskname) }
      end
    else
      this_batch[:args_hashes] << task_args_hash
    end
  end
end