Class: ThreadStorm

Inherits:
Object show all
Defined in:
lib/thread_storm.rb,
lib/thread_storm/queue.rb,
lib/thread_storm/worker.rb,
lib/thread_storm/execution.rb

Overview

Simple but powerful thread pool implementation.

Defined Under Namespace

Classes: Execution, Queue, TimeoutError, Worker

Constant Summary collapse

VERSION =

Version of ThreadStorm that you are using.

File.read(File.dirname(__FILE__)+"/../VERSION").chomp
DEFAULTS =

Default options found in ThreadStorm.options.

{ :size => 2,
:execute_blocks => false,
:timeout => nil,
:timeout_method => Timeout.method(:timeout),
:timeout_exception => Timeout::Error,
:default_value => nil,
:reraise => true }.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ThreadStorm

call-seq:

new(options = {}) -> thread_storm
new(options = {}){ |self| ... } -> thread_storm

Valid options are…

:size => How many threads to spawn.
:timeout => Max time an execution is allowed to run before terminating it.  Nil means no timeout.
:timeout_method => An object that implements something like Timeout.timeout via #call..
:default_value => Value of an execution if it times out or errors..
:reraise => True if you want exceptions to be reraised when ThreadStorm#join is called.
:execute_blocks => True if you want #execute to block until there is an available thread.

For defaults, see DEFAULTS.

When given a block, ThreadStorm#join and ThreadStorm#shutdown are called for you. In other words…

ThreadStorm.new do |storm|
  storm.execute{ sleep(1) }
end

…is the same as…

storm = ThreadStorm.new
storm.execute{ sleep(1) }
storm.join
storm.shutdown


60
61
62
63
64
65
66
# File 'lib/thread_storm.rb', line 60

def initialize(options = {})
  @options = options.reverse_merge(self.class.options)
  @queue = Queue.new(@options[:size], @options[:execute_blocks])
  @executions = []
  @workers = (1..@options[:size]).collect{ Worker.new(@queue) }
  run{ yield(self) } if block_given?
end

Instance Attribute Details

#executionsObject (readonly)

Array of executions in order as they are defined by calls to ThreadStorm#execute.



35
36
37
# File 'lib/thread_storm.rb', line 35

def executions
  @executions
end

#optionsObject (readonly)

Options specific to a ThreadStorm instance.



32
33
34
# File 'lib/thread_storm.rb', line 32

def options
  @options
end

Instance Method Details

#clear_executions(method_name = nil, &block) ⇒ Object

Removes executions stored at ThreadStorm#executions. You can selectively remove them by passing in a block or a symbol. The following two lines are equivalent.

storm.clear_executions(:finished?)
storm.clear_executions{ |e| e.finished? }

Because of the nature of threading, the following code could happen:

storm.clear_executions(:finished?)
storm.executions.any?{ |e| e.finished? }

Some executions could have finished between the two calls.



160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/thread_storm.rb', line 160

def clear_executions(method_name = nil, &block)
  cleared, @executions = @executions.separate do |execution|
    if block_given?
      yield(execution)
    elsif method_name.nil?
      true
    else
      execution.send(method_name)
    end
  end
  cleared
end

#execute(*args, &block) ⇒ Object

call-seq:

storm.execute(*args){ |*args| ... } -> execution
storm.execute(execution) -> execution

Schedules an execution to be run (i.e. moves it to the :queued state). When given a block, it is the same as

execution = ThreadStorm::Execution.new(*args){ |*args| ... }
storm.execute(execution)


107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/thread_storm.rb', line 107

def execute(*args, &block)
  if block_given?
    execution = new_execution(*args, &block)
  elsif args.length == 1 and args.first.instance_of?(Execution)
    execution = args.first
  else
    raise ArgumentError, "execution or arguments and block expected"
  end
  
  @queue.synchronize do |q|
    q.enqueue(execution)
    execution.queued! # This needs to be in here or we'll get a race condition to set the execution's state.
  end
  
  @executions << execution
  
  execution
end

#joinObject

Block until all pending executions are finished running. Reraises any exceptions caused by executions unless :reraise => false was passed to ThreadStorm#new.



128
129
130
131
132
# File 'lib/thread_storm.rb', line 128

def join
  @executions.each do |execution|
    execution.join
  end
end

#new_execution(*args, &block) ⇒ Object

This is like Execution.new except the default options are specific this ThreadStorm instance.

ThreadStorm.options[:timeout]
# => nil
storm = ThreadStorm.new :timeout => 1
execution = storm.new_execution
execution.options[:timeout]
# => 1
execution = ThreadStorm::Execution.new
execution.options[:timeout]
# => nil


78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/thread_storm.rb', line 78

def new_execution(*args, &block)
  
  # It has to be this way because of how options are merged.
  
  if block_given?
    Execution.new(options.dup).define(*args, &block)
  elsif args.length == 0
    Execution.new(options.dup)
  elsif args.length == 1 and args.first.kind_of?(Hash)
    Execution.new(options.merge(args.first))
  else
    raise ArgumentError, "illegal call-seq"
  end
end

#run {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:

  • _self (ThreadStorm)

    the object that the method was called on



93
94
95
96
97
# File 'lib/thread_storm.rb', line 93

def run
  yield(self)
  join
  shutdown
end

#shutdownObject

Signals the worker threads to terminate immediately (ignoring any pending executions) and blocks until they do.



141
142
143
144
145
# File 'lib/thread_storm.rb', line 141

def shutdown
  @queue.shutdown
  threads.each{ |thread| thread.join }
  true
end

#threadsObject

Returns an array of Ruby threads in the pool.



148
149
150
# File 'lib/thread_storm.rb', line 148

def threads
  @workers.collect{ |worker| worker.thread }
end

#valuesObject

Calls ThreadStorm#join, then collects the values of each execution.



135
136
137
# File 'lib/thread_storm.rb', line 135

def values
  join and @executions.collect{ |execution| execution.value }
end