require 'rubygems'
require 'eventmachine'
require 'deferred'

require 'logger'

$log = Logger.new($stderr).tap { |l| l.level = Logger::DEBUG }

# This is a real-world example of how we use this gem in a production setting.
#
# Surrounding this would be a daemon that's listening for events that represent
# asynchronous tasks that need to be run (what the simplified TaskRunner class
# below represents). We have components in our stack that are not evented, so we
# need to run tasks on the threadpool. In addition, some of those tasks (like the
# MainTask below) represent a "meta" task, a task that may have several
# sub-components that can be run simultaneously. An example of this kind of task
# would be one that needs to retrieve multiple resources from the web. The "meta"
# task is "retrieve all sources," which can be divided into N number of tasks that
# can be run concurrently.
#
# This code performs the following steps:
#
# * The TaskRunner creates a MainTask, and sets things up so that it can run its 
#   process! method in the threadpool
#
# * The TaskRunner block is scheduled and the MainTask is run
#
# * The MainTask process! method:
#   * sets up a callback that will call the handle_success method on successful
#     completion of all subtasks (using the on_iterator_finished event)
#   * sets up a callback that will call handle_failure in the case of any
#     SubThreadTask failure (using the on_iterator_finished event)
#   * sets up a next_tick block and returns self, effectively freeing the
#     thread. (This is technically kind of a waste of the use of a thread, but
#     for consistency with the rest of our task system, it's easier than
#     special casing this)
#
# * Since MainTask is a Deferred, the TaskRunner will chain itself to the MainTask,
#   meaning that when the MainTask calls back, the TaskRunner will inspect its return
#   value and take the appropriate action (it will either be done, or if it's another
#   Deferred it will wait on *that*, and if it's also a ThreadpoolJob it will
#   call defer! on that return value). 
#
# * the reactor ticks
#
# * The next_tick block runs an EM::Iterator to create SubThreadTasks which
#   will be run in their own threads. When all subtasks are complete it fires
#   the on_iterator_finished event. If any SubThreadTask fails,
#   on_iterator_finished will errback immediately, all running tasks will
#   complete, but will be ignored.
#
# * The SubThreadTasks all complete
#
# * on_iterator_finished is fired and in order to handle the result MainTask will
#   need to perform another ThreadpoolJob. We set up the ThreadpoolJob, and call
#   succeed on the MainTask with that ThreadpoolJob as an argument. The MainTask
#   will now depend on that job's succesful completion
#
# * the cleanup task runs in a separate thread
#
# * the cleanup task completes and the main TaskRunner job is done.
#
#
# Now, this may seem like complete overkill, however it shows how flexible the
# ThreadpoolJob is (credit to Topper Bowers who figured out the recursive
# behavior for ThreadpoolJob#handle_result, ALL PRAISE TO THE HYPNOTOAD!). We
# have a myriad of different solutions, some sync, some async. The flexibility
# of Deferred allows us to leverage EventMachine to manage the overall
# concurrency in a sensible way, but use the threadpool freely, and therefore
# give us an incredibly powerful tool.
#
module ThreadpoolJobSubtasks
  class SubThreadTask
    include Deferred::ThreadpoolJob

    attr_accessor :pause_for_sec

    def initialize(pause_for_sec)
      @pause_for_sec = pause_for_sec
      @on_run_block = method(:process!).to_proc
    end

    def process!
      $log.debug { "SubThreadTask: Thread id: %16x sleeping %0.9f sec" % [Thread.current.object_id, @pause_for_sec] }
      sleep(@pause_for_sec)
      self.succeed("SubThreadTask: Thread id: %16x complete %0.9f sec" % [Thread.current.object_id, @pause_for_sec] )
    end
  end

  class MainTask
    include Deferred
    include Deferred::Accessors

    deferred_event :iterator_finished

    def initialize
      @durations = []
    end

    def wait_for_durations=(durations)
      @durations = durations
    end

    def process!
      $log.debug { "MainTask#process! called, going to schedule sleeps for #{@durations.inspect} seconds concurrently" }

      on_iterator_finished.callback { |*| handle_success }
      on_iterator_finished.errback  { |e| handle_failure(e) }

      EM.next_tick do
        EM::Iterator.new(@durations.dup, 5).each(
          lambda { |dur,iter|
            tpj = SubThreadTask.new(dur)
            tpj.before_run.callback { $log.debug { "SubThreadTask: about to sleep #{dur}" } }
            tpj.callback { |s| $log.debug(s); @durations.delete(dur); iter.next }
            tpj.errback  { |e| on_iterator_finished.fail(e) }
            tpj.defer!
          },
          lambda { 
            $log.debug { "MainTask: all SubThreadTasks completed, @durations: #{@durations.inspect}" }
            on_iterator_finished.succeed 
          }
        )

      end

      self
    end

    # on success, we need to perform another blocking action, so we return a
    # ThreadpoolJob instance (actually, anything that responds_to?(:defer!))
    def handle_success
      $log.debug { "MainTask: all SubThreadTasks have completed running successfully, now we do some other action on the threadpool before we're done" }

      Deferred::DefaultThreadpoolJob.new.tap do |dtj|
        dtj.on_run do
          sleep(1.0)
        end

        dtj.before_run do
          $log.debug { "MainTask: start some blocking action after success" }
        end

        dtj.callback do
          $log.debug { "MainTask: some blocking action after success finished" }
        end

        dtj.errback { |e| raise e }

        # here we do something fairly tricky.
        # since this is all done for side-effect, we can signal that we're done processing
        # (i.e. the main task is complete), but do some kind of asynchronous "next task"
        # by returning a ThreadpoolJob.
        $log.debug { "MainTask: succeeding the main task with another ThreadpoolJob" }

        self.succeed(dtj)
      end
    end

    def handle_failure(exc)
      $log.debug { "Oh noes! an error occurred! #{exc.inspect}" }
      self.fail(exc)
    end
  end

  # This needs to run MainTask so that we don't get re-scheduled over and over
  # (as if MainTask responds_to?(:defer!) it'll get deferred repeatedly)
  class TaskRunner
    include Deferred::ThreadpoolJob
  end

  def self.run!
    sub_task_durations = []

    20.times { sub_task_durations << rand }

    EM.run do
      main_task = MainTask.new.tap do |main|
        main.wait_for_durations = sub_task_durations
      end

      TaskRunner.new.tap do |tr|
        tr.on_run { main_task.process! }

        tr.callback { $log.debug { "TaskRunner: main task completed" } }
        tr.errback  { |e| raise e }

        tr.ensure_that do 
          EM.next_tick do
            $log.debug { "TaskRunner: we're outta here" }
            EM.stop_event_loop
          end
        end

        tr.defer!
      end
    end
  end
end

ThreadpoolJobSubtasks.run!