Module: TaskBatcher
Defined Under Namespace
Classes: Error
Instance Method Summary collapse
-
#batch_duration(taskname) ⇒ Fixnum
Return the batch duration for a particular task name.
-
#default_batch_duration ⇒ Fixnum
Return the default batch duration in seconds.
-
#process_batch(taskname) ⇒ Object
Generally, the #process_batch method will be called by Event Machine when the batch duration expires.
-
#reset ⇒ Object
Reset all state; forget everything about batches in progress, tasknames, customized durations, etc.
-
#set_batch_duration(taskname, seconds) ⇒ Object
Set the batch duration for a particular task name.
-
#set_default_batch_duration(seconds) ⇒ Object
With no guidance from the client, the default batch duration is the value of METADEFAULT_DURATION.
-
#task(taskname, task_args_hash, callback = nil, &block) ⇒ Object
Add a task to a batch, or start a new batch if none exists with this taskname.
Instance Method Details
#batch_duration(taskname) ⇒ Fixnum
Return the batch duration for a particular task name.
140 141 142 |
# File 'lib/task_batcher.rb', line 140 def batch_duration(taskname) durations[taskname] || default_batch_duration end |
#default_batch_duration ⇒ Fixnum
Return the default batch duration in seconds. (This will be the value of METADEFAULT_DURATION, unless you have called #set_default_batch_duration.)
125 126 127 |
# File 'lib/task_batcher.rb', line 125 def default_batch_duration @default_batch_duration || METADEFAULT_DURATION end |
#process_batch(taskname) ⇒ Object
Generally, the #process_batch method will be called by Event Machine when the batch duration expires. However, it is public because the caller may want to explicitly process a batch.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/task_batcher.rb', line 196 def process_batch(taskname) this_batch = nil # Grab the batch. If another TaskBatcher call is added immediately after # this synchronize step, then it just starts a new batch. mutex.synchronize do this_batch = batches[taskname] batches[taskname] = nil end return nil if this_batch.nil? # Grab the block, list of batch args, and callback. Then run the batch! batch_processor = this_batch[:block] args_hashes = this_batch[:args_hashes] callback = this_batch[:callback] || lambda {|arg_hashes| nil} begin results = batch_processor[args_hashes] rescue Exception => e raise Error, "Exception raised during TaskBatcher processing:\n" + "#{e.backtrace.join("\n ")}" end # Run the callback. The callback's return value isn't accessible in any # meaningful way. callback[results] end |
#reset ⇒ Object
Reset all state; forget everything about batches in progress, tasknames, customized durations, etc.
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/task_batcher.rb', line 101 def reset @default_batch_duration = @batch_durations = @batches = @mutex = nil # Other system components might be using EM. If an EM reactor is running, # do nothing. If not, start one on a thread that we'll own. if not EM.reactor_running? @em_thread = Thread.new { EM.run } while not EM.reactor_running?; sleep 0.01; end end end |
#set_batch_duration(taskname, seconds) ⇒ Object
Set the batch duration for a particular task name.
133 134 135 |
# File 'lib/task_batcher.rb', line 133 def set_batch_duration(taskname, seconds) durations[taskname] = seconds end |
#set_default_batch_duration(seconds) ⇒ Object
With no guidance from the client, the default batch duration is the value of METADEFAULT_DURATION. However, in addition to being able to set the batch duration of any taskname, clients can set the global default batch duration.
117 118 119 |
# File 'lib/task_batcher.rb', line 117 def set_default_batch_duration(seconds) @default_batch_duration = seconds end |
#task(taskname, task_args_hash, callback = nil, &block) ⇒ Object
Add a task to a batch, or start a new batch if none exists with this taskname.
Once the batch duration expires, all tasks in the batch will be run, i.e. the block will be passed a list of task_args_hash values and the block is expected to process that entire list.
Usage: TaskBatcher.task(‘taskname’, ‘foo’, var2: ‘bar’) do |args_hashes|
var1s = args_hashes.map { |h| h[:var1] }
var2s = args_hashes.map { |h| h[:var2] }
print "This batcher was invoked with #{args_hashes.count} tasks.\n"
print "Hopefully the tasks can be batched efficiently.\n"
print "Then it would be faster to invoke 'func1(var1s); func2(var2s)' \n"
print "instead of 'args_hashes.each { |h| func12(h[:var1], h[:var2]) }'\n"
end
Only the batch’s FIRST callback and block are respected!
# assuming the batch is empty here...
TaskBatcher.task('mytask', args1, callback1) { func1 }
TaskBatcher.task('mytask', args2, callback2) { func2 }
TaskBatcher.task('mytask', args3, callback3) { func3 }
# ... then this batch will invoke func1([args1, args2, args3]) and invoke
# callback1 on the result. func2/func3 and callback2/callback3 will
# never be used!
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/task_batcher.rb', line 172 def task(taskname, task_args_hash, callback=nil, &block) raise Error, 'taskname required' if taskname.nil? mutex.synchronize do this_batch = batches[taskname] if this_batch.nil? batches[taskname] = { block: block, callback: callback, args_hashes: [ task_args_hash ], } EM.add_timer(batch_duration(taskname)) do EM.defer { process_batch(taskname) } end else this_batch[:args_hashes] << task_args_hash end end end |