require 'rubygems' require 'eventmachine' require 'deferred' require 'logger' $log = Logger.new($stderr).tap { |l| l.level = Logger::DEBUG } # This is a real-world example of how we use this gem in a production setting. # # Surrounding this would be a daemon that's listening for events that represent # asynchronous tasks that need to be run (what the simplified TaskRunner class # below represents). We have components in our stack that are not evented, so we # need to run tasks on the threadpool. In addition, some of those tasks (like the # MainTask below) represent a "meta" task, a task that may have several # sub-components that can be run simultaneously. An example of this kind of task # would be one that needs to retrieve multiple resources from the web. The "meta" # task is "retrieve all sources," which can be divided into N number of tasks that # can be run concurrently. # # This code performs the following steps: # # * The TaskRunner creates a MainTask, and sets things up so that it can run its # process! method in the threadpool # # * The TaskRunner block is scheduled and the MainTask is run # # * The MainTask process! method: # * sets up a callback that will call the handle_success method on successful # completion of all subtasks (using the on_iterator_finished event) # * sets up a callback that will call handle_failure in the case of any # SubThreadTask failure (using the on_iterator_finished event) # * sets up a next_tick block and returns self, effectively freeing the # thread. (This is technically kind of a waste of the use of a thread, but # for consistency with the rest of our task system, it's easier than # special casing this) # # * Since MainTask is a Deferred, the TaskRunner will chain itself to the MainTask, # meaning that when the MainTask calls back, the TaskRunner will inspect its return # value and take the appropriate action (it will either be done, or if it's another # Deferred it will wait on *that*, and if it's also a ThreadpoolJob it will # call defer! on that return value). # # * the reactor ticks # # * The next_tick block runs an EM::Iterator to create SubThreadTasks which # will be run in their own threads. When all subtasks are complete it fires # the on_iterator_finished event. If any SubThreadTask fails, # on_iterator_finished will errback immediately, all running tasks will # complete, but will be ignored. # # * The SubThreadTasks all complete # # * on_iterator_finished is fired and in order to handle the result MainTask will # need to perform another ThreadpoolJob. We set up the ThreadpoolJob, and call # succeed on the MainTask with that ThreadpoolJob as an argument. The MainTask # will now depend on that job's succesful completion # # * the cleanup task runs in a separate thread # # * the cleanup task completes and the main TaskRunner job is done. # # # Now, this may seem like complete overkill, however it shows how flexible the # ThreadpoolJob is (credit to Topper Bowers who figured out the recursive # behavior for ThreadpoolJob#handle_result, ALL PRAISE TO THE HYPNOTOAD!). We # have a myriad of different solutions, some sync, some async. The flexibility # of Deferred allows us to leverage EventMachine to manage the overall # concurrency in a sensible way, but use the threadpool freely, and therefore # give us an incredibly powerful tool. # module ThreadpoolJobSubtasks class SubThreadTask include Deferred::ThreadpoolJob attr_accessor :pause_for_sec def initialize(pause_for_sec) @pause_for_sec = pause_for_sec @on_run_block = method(:process!).to_proc end def process! $log.debug { "SubThreadTask: Thread id: %16x sleeping %0.9f sec" % [Thread.current.object_id, @pause_for_sec] } sleep(@pause_for_sec) self.succeed("SubThreadTask: Thread id: %16x complete %0.9f sec" % [Thread.current.object_id, @pause_for_sec] ) end end class MainTask include Deferred include Deferred::Accessors deferred_event :iterator_finished def initialize @durations = [] end def wait_for_durations=(durations) @durations = durations end def process! $log.debug { "MainTask#process! called, going to schedule sleeps for #{@durations.inspect} seconds concurrently" } on_iterator_finished.callback { |*| handle_success } on_iterator_finished.errback { |e| handle_failure(e) } EM.next_tick do EM::Iterator.new(@durations.dup, 5).each( lambda { |dur,iter| tpj = SubThreadTask.new(dur) tpj.before_run.callback { $log.debug { "SubThreadTask: about to sleep #{dur}" } } tpj.callback { |s| $log.debug(s); @durations.delete(dur); iter.next } tpj.errback { |e| on_iterator_finished.fail(e) } tpj.defer! }, lambda { $log.debug { "MainTask: all SubThreadTasks completed, @durations: #{@durations.inspect}" } on_iterator_finished.succeed } ) end self end # on success, we need to perform another blocking action, so we return a # ThreadpoolJob instance (actually, anything that responds_to?(:defer!)) def handle_success $log.debug { "MainTask: all SubThreadTasks have completed running successfully, now we do some other action on the threadpool before we're done" } Deferred::DefaultThreadpoolJob.new.tap do |dtj| dtj.on_run do sleep(1.0) end dtj.before_run do $log.debug { "MainTask: start some blocking action after success" } end dtj.callback do $log.debug { "MainTask: some blocking action after success finished" } end dtj.errback { |e| raise e } # here we do something fairly tricky. # since this is all done for side-effect, we can signal that we're done processing # (i.e. the main task is complete), but do some kind of asynchronous "next task" # by returning a ThreadpoolJob. $log.debug { "MainTask: succeeding the main task with another ThreadpoolJob" } self.succeed(dtj) end end def handle_failure(exc) $log.debug { "Oh noes! an error occurred! #{exc.inspect}" } self.fail(exc) end end # This needs to run MainTask so that we don't get re-scheduled over and over # (as if MainTask responds_to?(:defer!) it'll get deferred repeatedly) class TaskRunner include Deferred::ThreadpoolJob end def self.run! sub_task_durations = [] 20.times { sub_task_durations << rand } EM.run do main_task = MainTask.new.tap do |main| main.wait_for_durations = sub_task_durations end TaskRunner.new.tap do |tr| tr.on_run { main_task.process! } tr.callback { $log.debug { "TaskRunner: main task completed" } } tr.errback { |e| raise e } tr.ensure_that do EM.next_tick do $log.debug { "TaskRunner: we're outta here" } EM.stop_event_loop end end tr.defer! end end end end ThreadpoolJobSubtasks.run!