Class: Utilrb::ThreadPool::Task
Overview
A Task is executed by the thread pool as soon as a free thread is available.
Constant Summary collapse
- Asked =
Class.new(Exception)
Instance Attribute Summary collapse
-
#description ⇒ Object
Custom description which can be used to store a human readable object.
-
#exception ⇒ Exception
readonly
The exception thrown by the custom code block.
-
#pool ⇒ ThreadPool
readonly
Thread pool the task belongs to.
-
#queued_at ⇒ Object
The time the task was queued .
-
#result ⇒ Object
readonly
Result of the code block call.
-
#started_at ⇒ Object
readonly
The time the task was started.
-
#state ⇒ :waiting, ...
readonly
State of the task.
-
#stopped_at ⇒ Object
readonly
The time the task was stopped or finished.
-
#sync_key ⇒ Object
readonly
The sync key is used to speifiy that a given task must not run in paralles with another task having the same sync key.
-
#thread ⇒ Object
readonly
The thread the task was assigned to.
Instance Method Summary collapse
-
#callback {|Object, Exception| ... } ⇒ Object
Called from the worker thread when the work is done.
-
#default? ⇒ Boolean
returns true if the task has a default return vale.
-
#exception? ⇒ Boolean
Checks if an exception occurred.
-
#execute ⇒ Object
Executes the task.
-
#finalize ⇒ Object
propagates the tasks state should be called after execute.
-
#finished? ⇒ Boolean
Checks if the task was stopped or finished.
-
#initialize(options = Hash.new, *args, &block) ⇒ Task
constructor
A new task which can be added to the work queue of a Utilrb::ThreadPool.
-
#pre_execute(pool = nil) ⇒ Object
sets all internal state to running call execute after that.
-
#reset ⇒ Object
Resets the tasks.
-
#running? ⇒ Boolean
Checks if the task is running.
-
#started? ⇒ Boolean
Checks if the task was started.
-
#stopping? ⇒ Boolean
Checks if the task is going to be stopped.
-
#successfull? ⇒ Boolean
Checks if the task was successfully finished.
-
#terminate!(exception = Asked) ⇒ Object
Terminates the task if it is running.
-
#terminated? ⇒ Boolean
Checks if the task was terminated.
-
#time_elapsed(time = Time.now) ⇒ Object
Returns the number of seconds the task is or was running at the given point in time.
Constructor Details
#initialize(options = Hash.new, *args, &block) ⇒ Task
A new task which can be added to the work queue of a Utilrb::ThreadPool. If a sync key is given no task having the same key will be executed in parallel which is useful for instance member calls which are not thread safe.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/utilrb/thread_pool.rb', line 128 def initialize ( = Hash.new,*args, &block) unless block raise ArgumentError, 'you must pass a work block to initialize a new Task.' end = Kernel.(,{:sync_key => nil,:default => nil,:callback => nil}) @sync_key = [:sync_key] @arguments = args @default = [:default] @callback = [:callback] @block = block @mutex = Mutex.new @pool = nil @state_temp = nil @state = nil reset end |
Instance Attribute Details
#description ⇒ Object
Custom description which can be used to store a human readable object
77 78 79 |
# File 'lib/utilrb/thread_pool.rb', line 77 def description @description end |
#exception ⇒ Exception (readonly)
The exception thrown by the custom code block
50 51 52 |
# File 'lib/utilrb/thread_pool.rb', line 50 def exception @exception end |
#pool ⇒ ThreadPool (readonly)
Thread pool the task belongs to
40 41 42 |
# File 'lib/utilrb/thread_pool.rb', line 40 def pool @pool end |
#queued_at ⇒ Object
The time the task was queued
return [Time] the time
60 61 62 |
# File 'lib/utilrb/thread_pool.rb', line 60 def queued_at @queued_at end |
#result ⇒ Object (readonly)
Result of the code block call
73 74 75 |
# File 'lib/utilrb/thread_pool.rb', line 73 def result @result end |
#started_at ⇒ Object (readonly)
The time the task was started
return [Time] the time
65 66 67 |
# File 'lib/utilrb/thread_pool.rb', line 65 def started_at @started_at end |
#state ⇒ :waiting, ... (readonly)
State of the task
45 46 47 |
# File 'lib/utilrb/thread_pool.rb', line 45 def state @state end |
#stopped_at ⇒ Object (readonly)
The time the task was stopped or finished
return [Time] the time
70 71 72 |
# File 'lib/utilrb/thread_pool.rb', line 70 def stopped_at @stopped_at end |
#sync_key ⇒ Object (readonly)
The sync key is used to speifiy that a given task must not run in paralles with another task having the same sync key. If no key is set there are no such constrains for the taks.
35 36 37 |
# File 'lib/utilrb/thread_pool.rb', line 35 def sync_key @sync_key end |
#thread ⇒ Object (readonly)
The thread the task was assigned to
return [Thread] the thread
55 56 57 |
# File 'lib/utilrb/thread_pool.rb', line 55 def thread @thread end |
Instance Method Details
#callback {|Object, Exception| ... } ⇒ Object
Called from the worker thread when the work is done
231 232 233 234 235 |
# File 'lib/utilrb/thread_pool.rb', line 231 def callback(&block) @mutex.synchronize do @callback = block end end |
#default? ⇒ Boolean
returns true if the task has a default return vale
164 165 166 167 168 |
# File 'lib/utilrb/thread_pool.rb', line 164 def default? @mutex.synchronize do @default != nil end end |
#exception? ⇒ Boolean
Checks if an exception occurred.
115 |
# File 'lib/utilrb/thread_pool.rb', line 115 def exception?; @state == :exception; end |
#execute ⇒ Object
Executes the task. Should be called from a worker thread after pre_execute was called. After execute returned and the task was deleted from any internal list finalize must be called to propagate the task state.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/utilrb/thread_pool.rb', line 188 def execute() raise RuntimeError, "call pre_execute ThreadPool::Task first. Current state is #{@state} but :running was expected" if @state != :running @state_temp = begin @result = @block.call(*@arguments) :finished rescue Exception => e @exception = e if e.is_a? Asked :terminated else :exception end end @stopped_at = Time.now end |
#finalize ⇒ Object
propagates the tasks state should be called after execute
206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/utilrb/thread_pool.rb', line 206 def finalize @mutex.synchronize do @thread = nil @state = @state_temp @pool = nil end begin @callback.call @result,@exception if @callback rescue Exception => e ThreadPool.report_exception("thread_pool: in #{self}, callback #{@callback} failed", e) end end |
#finished? ⇒ Boolean
Checks if the task was stopped or finished. This also includes cases where an exception was raised by the custom code block.
99 |
# File 'lib/utilrb/thread_pool.rb', line 99 def finished?; started? && !running? && !stopping?; end |
#pre_execute(pool = nil) ⇒ Object
sets all internal state to running call execute after that.
172 173 174 175 176 177 178 179 180 181 |
# File 'lib/utilrb/thread_pool.rb', line 172 def pre_execute(pool=nil) @mutex.synchronize do #store current thread to be able to terminate #the thread @pool = pool @thread = Thread.current @started_at = Time.now @state = :running end end |
#reset ⇒ Object
Resets the tasks. This can be used to requeue a task that is already finished
147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/utilrb/thread_pool.rb', line 147 def reset if finished? || !started? @mutex.synchronize do @result = @default @state = :waiting @exception = nil @started_at = nil @queued_at = nil @stopped_at = nil end else raise RuntimeError,"cannot reset a task which is not finished" end end |
#running? ⇒ Boolean
Checks if the task is running
87 |
# File 'lib/utilrb/thread_pool.rb', line 87 def running?; @state == :running; end |
#started? ⇒ Boolean
Checks if the task was started
82 |
# File 'lib/utilrb/thread_pool.rb', line 82 def started?; @state != :waiting; end |
#stopping? ⇒ Boolean
Checks if the task is going to be stopped
92 |
# File 'lib/utilrb/thread_pool.rb', line 92 def stopping?; @state == :stopping; end |
#successfull? ⇒ Boolean
Checks if the task was successfully finished. This means no exceptions, termination or timed out occurred
105 |
# File 'lib/utilrb/thread_pool.rb', line 105 def successfull?; @state == :finished; end |
#terminate!(exception = Asked) ⇒ Object
Terminates the task if it is running
220 221 222 223 224 225 226 |
# File 'lib/utilrb/thread_pool.rb', line 220 def terminate!(exception = Asked) @mutex.synchronize do return unless running? @state = :stopping @thread.raise exception end end |
#terminated? ⇒ Boolean
Checks if the task was terminated.
110 |
# File 'lib/utilrb/thread_pool.rb', line 110 def terminated?; @state == :terminated; end |
#time_elapsed(time = Time.now) ⇒ Object
Returns the number of seconds the task is or was running at the given point in time
242 243 244 245 246 247 248 249 250 251 |
# File 'lib/utilrb/thread_pool.rb', line 242 def time_elapsed(time = Time.now) #no need to synchronize here if @stopped_at (@stopped_at-@started_at).to_f elsif @started_at (time-@started_at).to_f else 0 end end |