Class: Rutty::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/rutty/thread_pool/worker.rb

Overview

The ThreadPool Thread wrapper, representing a single thread of execution in the pool.

Since:

  • 2.4.0

Instance Method Summary collapse

Constructor Details

#initialize(thread_queue) ⇒ Worker

Initialize a new Rutty::Worker. Sets up the initial state of this instance, then spins out its thread. The thread will wait until it’s signaled (by being passed a new closure to execute, or being told to die, etc.), then resume execution of its task. The Thread is wrapped in a while @running loop, allowing for easy termination from #shutdown.

Parameters:

  • thread_queue (Queue)

    The thread queue from the ThreadPool instance to add this worker to

Since:

  • 2.4.0



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/rutty/thread_pool/worker.rb', line 19

def initialize thread_queue
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @queue = thread_queue
  @running = true
  
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        @cv.wait @mutex
        block = get_block
        
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
        end
        
        @queue << self
      end
    end
  end
end

Instance Method Details

#busy?Boolean

Whether or not this Thread is currently executing. Determined by whether @block is nil.

Returns:

  • (Boolean)

    Is this Thread executing?

Since:

  • 2.4.0



85
86
87
# File 'lib/rutty/thread_pool/worker.rb', line 85

def busy?
  @mutex.synchronize { !@block.nil? }
end

#get_blockProc, ...

Returns the value of @block.

Returns:

  • (Proc, Lambda, #call)

    The @block object previously passed.

Since:

  • 2.4.0



56
57
58
# File 'lib/rutty/thread_pool/worker.rb', line 56

def get_block
  @block
end

#nameString

Returns the evaluation of @thread.inspect.

Returns:

  • (String)

    This thread’s name

Since:

  • 2.4.0



48
49
50
# File 'lib/rutty/thread_pool/worker.rb', line 48

def name
  @thread.inspect
end

#reset_blockvoid

Sets the @block to nil. Called after the Thread has finished execution of a task to ready itself for the next one.

Since:

  • 2.4.0



77
78
79
# File 'lib/rutty/thread_pool/worker.rb', line 77

def reset_block
  @block = nil
end

#set_block(block) ⇒ void

Sets a new closure as the task for this worker. Signals the Thread to wakeup after assignment.

Parameters:

  • block (Proc, Lambda, #call)

    An instance of any class that responds to #call

Raises:

  • (RuntimeError)

    If the thread is currently busy when called.

Since:

  • 2.4.0



65
66
67
68
69
70
71
72
# File 'lib/rutty/thread_pool/worker.rb', line 65

def set_block block
  @mutex.synchronize do
    raise RuntimeError, "Thread already busy." if @block
    @block = block
    # Signal the thread in this class, that there's a job to be done
    @cv.signal
  end
end

#stopvoid

Sets @running to false, causing the Thread to finish after the current loop. Signals the Thread to wake and resume, then calls #join.

See Also:

Since:

  • 2.4.0



94
95
96
97
98
99
100
101
# File 'lib/rutty/thread_pool/worker.rb', line 94

def stop
  @mutex.synchronize do
    @running = false
    @cv.signal
  end
  
  @thread.join
end