Module: Deferred::ThreadpoolJob

Includes:
Accessors, InstanceMethods
Included in:
DefaultThreadpoolJob
Defined in:
lib/deferred/threadpool_job.rb

Overview

Adds several methods that simplify using a Deferred as a mechanism for handling the results of an EM.defer call. see #defer!

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from InstanceMethods

#callback, #chain_err, #chain_to, #ensure_that, #errback, #errback_on_exception, #timeout

Instance Attribute Details

#on_run_blockObject

Returns the value of attribute on_run_block.



10
11
12
# File 'lib/deferred/threadpool_job.rb', line 10

def on_run_block
  @on_run_block
end

Instance Method Details

#call_on_run_blockObject (protected)



39
40
41
42
43
# File 'lib/deferred/threadpool_job.rb', line 39

def call_on_run_block
  @on_run_block.call
rescue Exception => exc
  exc
end

#defer!Object

Calls the @on_run_block as the first argument to EM.defer, and uses the result (the value returned from calling the block) as the deferred result that will be used to call the registered callbacks. If the block raises an exception, that exception instance will be the argument used to call the registered errbacks.

Raises:

  • (OnRunBlockNotRegisteredError)

    when defer! has been called without an on_run block assigned (i.e. with no work to defer to a threadpool)



27
28
29
30
31
32
33
34
35
36
# File 'lib/deferred/threadpool_job.rb', line 27

def defer!
  raise OnRunBlockNotRegisteredError unless @on_run_block

  EM.next_tick do 
    before_run.succeed
    EM.defer(method(:call_on_run_block).to_proc, method(:handle_completion).to_proc)
  end

  self
end

#handle_completion(*tp_result) ⇒ Object (protected)

after cb above runs, this method is called in the reactor



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/deferred/threadpool_job.rb', line 46

def handle_completion(*tp_result)
  first = tp_result.first

  if first.kind_of?(Exception)
    self.fail(first)
  elsif first.respond_to?(:callback) and first.respond_to?(:errback)
    first.callback { |*a| handle_completion(*a) }
    first.errback  { |*a| handle_completion(*a) }
    if first.respond_to?(:defer!)
      EM.schedule { first.defer! }
    end
  else
    self.succeed(*tp_result)
  end
end

#on_run(&block) ⇒ Object

Used with the defer! method. the block given will be the block run in the EM.threadpool (via EM.defer). see #defer!



14
15
16
# File 'lib/deferred/threadpool_job.rb', line 14

def on_run(&block)
  @on_run_block = block
end