Class: Utilrb::ThreadPool::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/utilrb/thread_pool.rb

Overview

A Task is executed by the thread pool as soon as a free thread is available.

Author:

Constant Summary collapse

Asked =
Class.new(Exception)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = Hash.new, *args, &block) ⇒ Task

A new task which can be added to the work queue of a Utilrb::ThreadPool. If a sync key is given no task having the same key will be executed in parallel which is useful for instance member calls which are not thread safe.

Parameters:

  • options (Hash) (defaults to: Hash.new)

    The options of the task.

  • args (Array)

    The arguments for the code block

  • block (#call)

    The code block

Options Hash (options):

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/utilrb/thread_pool.rb', line 128

def initialize (options = Hash.new,*args, &block)
    unless block
        raise ArgumentError, 'you must pass a work block to initialize a new Task.'
    end
    options = Kernel.validate_options(options,{:sync_key => nil,:default => nil,:callback => nil})
    @sync_key = options[:sync_key]
    @arguments = args
    @default = options[:default]
    @callback = options[:callback]
    @block = block
    @mutex = Mutex.new
    @pool = nil
    @state_temp = nil
    @state = nil
    reset
end

Instance Attribute Details

#descriptionObject

Custom description which can be used to store a human readable object



77
78
79
# File 'lib/utilrb/thread_pool.rb', line 77

def description
  @description
end

#exceptionException (readonly)

The exception thrown by the custom code block

Returns:



50
51
52
# File 'lib/utilrb/thread_pool.rb', line 50

def exception
  @exception
end

#poolThreadPool (readonly)

Thread pool the task belongs to

Returns:



40
41
42
# File 'lib/utilrb/thread_pool.rb', line 40

def pool
  @pool
end

#queued_atObject

The time the task was queued

return [Time] the time



60
61
62
# File 'lib/utilrb/thread_pool.rb', line 60

def queued_at
  @queued_at
end

#resultObject (readonly)

Result of the code block call



73
74
75
# File 'lib/utilrb/thread_pool.rb', line 73

def result
  @result
end

#started_atObject (readonly)

The time the task was started

return [Time] the time



65
66
67
# File 'lib/utilrb/thread_pool.rb', line 65

def started_at
  @started_at
end

#state:waiting, ... (readonly)

State of the task

Returns:

  • (:waiting, :running, :stopping, :finished, :terminated, :exception)

    the state



45
46
47
# File 'lib/utilrb/thread_pool.rb', line 45

def state
  @state
end

#stopped_atObject (readonly)

The time the task was stopped or finished

return [Time] the time



70
71
72
# File 'lib/utilrb/thread_pool.rb', line 70

def stopped_at
  @stopped_at
end

#sync_keyObject (readonly)

The sync key is used to speifiy that a given task must not run in paralles with another task having the same sync key. If no key is set there are no such constrains for the taks.

Returns:

  • the sync key



35
36
37
# File 'lib/utilrb/thread_pool.rb', line 35

def sync_key
  @sync_key
end

#threadObject (readonly)

The thread the task was assigned to

return [Thread] the thread



55
56
57
# File 'lib/utilrb/thread_pool.rb', line 55

def thread
  @thread
end

Instance Method Details

#callback {|Object, Exception| ... } ⇒ Object

Called from the worker thread when the work is done

Yields:



231
232
233
234
235
# File 'lib/utilrb/thread_pool.rb', line 231

def callback(&block)
    @mutex.synchronize do
        @callback = block
    end
end

#default?Boolean

returns true if the task has a default return vale

Returns:

  • (Boolean)


164
165
166
167
168
# File 'lib/utilrb/thread_pool.rb', line 164

def default?
     @mutex.synchronize do 
         @default != nil
     end
end

#exception?Boolean

Checks if an exception occurred.

Returns:

  • (Boolean)


115
# File 'lib/utilrb/thread_pool.rb', line 115

def exception?; @state == :exception; end

#executeObject

Executes the task. Should be called from a worker thread after pre_execute was called. After execute returned and the task was deleted from any internal list finalize must be called to propagate the task state.

Raises:

  • (RuntimeError)


188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/utilrb/thread_pool.rb', line 188

def execute()
    raise RuntimeError, "call pre_execute ThreadPool::Task first. Current state is #{@state} but :running was expected" if @state != :running
    @state_temp = begin
                @result = @block.call(*@arguments)
                :finished
            rescue Exception => e
                @exception = e
                if e.is_a? Asked
                    :terminated
                else
                    :exception
                end
            end
    @stopped_at = Time.now
end

#finalizeObject

propagates the tasks state should be called after execute



206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/utilrb/thread_pool.rb', line 206

def finalize
    @mutex.synchronize do
        @thread = nil
        @state = @state_temp
        @pool = nil
    end
    begin
        @callback.call @result,@exception if @callback
    rescue Exception => e
        ThreadPool.report_exception("thread_pool: in #{self}, callback #{@callback} failed", e)
    end
end

#finished?Boolean

Checks if the task was stopped or finished. This also includes cases where an exception was raised by the custom code block.

Returns:

  • (Boolean)


99
# File 'lib/utilrb/thread_pool.rb', line 99

def finished?; started? && !running? && !stopping?; end

#pre_execute(pool = nil) ⇒ Object

sets all internal state to running call execute after that.



172
173
174
175
176
177
178
179
180
181
# File 'lib/utilrb/thread_pool.rb', line 172

def pre_execute(pool=nil)
    @mutex.synchronize do 
        #store current thread to be able to terminate
        #the thread
        @pool = pool
        @thread = Thread.current
        @started_at = Time.now
        @state = :running
    end
end

#resetObject

Resets the tasks. This can be used to requeue a task that is already finished



147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/utilrb/thread_pool.rb', line 147

def reset
    if finished? || !started?
        @mutex.synchronize do
            @result = @default
            @state = :waiting
            @exception = nil
            @started_at = nil
            @queued_at = nil
            @stopped_at = nil
        end
    else
        raise RuntimeError,"cannot reset a task which is not finished"
    end
end

#running?Boolean

Checks if the task is running

Returns:

  • (Boolean)


87
# File 'lib/utilrb/thread_pool.rb', line 87

def running?; @state == :running; end

#started?Boolean

Checks if the task was started

Returns:

  • (Boolean)


82
# File 'lib/utilrb/thread_pool.rb', line 82

def started?; @state != :waiting; end

#stopping?Boolean

Checks if the task is going to be stopped

Returns:

  • (Boolean)


92
# File 'lib/utilrb/thread_pool.rb', line 92

def stopping?; @state == :stopping; end

#successfull?Boolean

Checks if the task was successfully finished. This means no exceptions, termination or timed out occurred

Returns:

  • (Boolean)


105
# File 'lib/utilrb/thread_pool.rb', line 105

def successfull?; @state == :finished; end

#terminate!(exception = Asked) ⇒ Object

Terminates the task if it is running



220
221
222
223
224
225
226
# File 'lib/utilrb/thread_pool.rb', line 220

def terminate!(exception = Asked)
    @mutex.synchronize do
        return unless running?
        @state = :stopping
        @thread.raise exception
    end
end

#terminated?Boolean

Checks if the task was terminated.

Returns:

  • (Boolean)


110
# File 'lib/utilrb/thread_pool.rb', line 110

def terminated?; @state == :terminated; end

#time_elapsed(time = Time.now) ⇒ Object

Returns the number of seconds the task is or was running at the given point in time

@return

Parameters:

  • time (Time) (defaults to: Time.now)

    The point in time.



242
243
244
245
246
247
248
249
250
251
# File 'lib/utilrb/thread_pool.rb', line 242

def time_elapsed(time = Time.now)
    #no need to synchronize here
    if @stopped_at
        (@stopped_at-@started_at).to_f
    elsif @started_at
        (time-@started_at).to_f
    else
        0
    end
end