Class: ThreadStorm

Inherits:
Object show all
Defined in:
lib/thread_storm.rb,
lib/thread_storm/queue.rb,
lib/thread_storm/worker.rb,
lib/thread_storm/execution.rb

Defined Under Namespace

Classes: Execution, Queue, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ThreadStorm

Valid options are

:size => How many threads to spawn.  Default is 2.
:timeout => Max time an execution is allowed to run before terminating it.  Default is nil (no timeout).
:timeout_method => An object that implements something like Timeout.timeout via #call.  Default is Timeout.method(:timeout).
:default_value => Value of an execution if it times out or errors.  Default is nil.
:reraise => True if you want exceptions reraised when ThreadStorm#join is called.  Default is true.


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/thread_storm.rb', line 19

def initialize(options = {})
  @options = options.option_merge :size => 2,
                                  :timeout => nil,
                                  :timeout_method => Timeout.method(:timeout),
                                  :default_value => nil,
                                  :reraise => true
  @queue = Queue.new # This is threadsafe.
  @executions = []
  @workers = (1..@options[:size]).collect{ Worker.new(@queue, @options) }
  @start_time = Time.now
  if block_given?
    yield(self)
    join
    shutdown
  end
end

Instance Attribute Details

#executionsObject (readonly)

Array of executions in order as defined by calls to ThreadStorm#execute.



11
12
13
# File 'lib/thread_storm.rb', line 11

def executions
  @executions
end

Instance Method Details

#default_valueObject



40
41
42
# File 'lib/thread_storm.rb', line 40

def default_value
  @options[:default_value]
end

#execute(*args, &block) ⇒ Object

Create and execution and schedules it to be run by the thread pool. Return value is a ThreadStorm::Execution.



50
51
52
53
54
55
56
# File 'lib/thread_storm.rb', line 50

def execute(*args, &block)
  Execution.new(args, &block).tap do |execution|
    execution.value = default_value
    @executions << execution
    @queue.push(execution)
  end
end

#joinObject

Block until all pending executions are finished running. Reraises any exceptions caused by executions unless :reraise => false was passed to ThreadStorm#new.



60
61
62
63
64
65
# File 'lib/thread_storm.rb', line 60

def join
  @executions.each do |execution|
    execution.join
    raise execution.exception if execution.exception and reraise?
  end
end

#reraise?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/thread_storm.rb', line 44

def reraise?
  @options[:reraise]
end

#shutdownObject

Signals the worker threads to terminate immediately (ignoring any pending executions) and blocks until they do.



75
76
77
78
79
80
# File 'lib/thread_storm.rb', line 75

def shutdown
  @workers.each{ |worker| worker.die! }
  @queue.die!
  @workers.each{ |worker| worker.thread.join }
  true
end

#sizeObject



36
37
38
# File 'lib/thread_storm.rb', line 36

def size
  @options[:size]
end

#threadsObject

Returns an array of threads in the pool.



83
84
85
# File 'lib/thread_storm.rb', line 83

def threads
  @workers.collect{ |worker| worker.thread }
end

#valuesObject

Calls ThreadStorm#join, then collects the values of each execution.



68
69
70
71
# File 'lib/thread_storm.rb', line 68

def values
  join
  @executions.collect{ |execution| execution.value }
end