Class: Rake::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/rake/thread_pool.rb

Overview

:nodoc: all

Instance Method Summary collapse

Constructor Details

#initialize(thread_count) ⇒ ThreadPool

Creates a ThreadPool object. The thread_count parameter is the size of the pool.

[View source]

12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rake/thread_pool.rb', line 12

def initialize(thread_count)
  @max_active_threads = [thread_count, 0].max
  @threads = Set.new
  @threads_mon = Monitor.new
  @queue = Queue.new
  @join_cond = @threads_mon.new_cond

  @history_start_time = nil
  @history = []
  @history_mon = Monitor.new
  @total_threads_in_play = 0
end

Instance Method Details

#future(*args, &block) ⇒ Object

Creates a future executed by the ThreadPool.

The args are passed to the block when executing (similarly to Thread#new) The return value is an object representing a future which has been created and added to the queue in the pool. Sending #value to the object will sleep the current thread until the future is finished and will return the result (or raise an exception thrown from the future)

[View source]

33
34
35
36
37
38
39
40
41
# File 'lib/rake/thread_pool.rb', line 33

def future(*args, &block)
  promise = Promise.new(args, &block)
  promise.recorder = lambda { |*stats| stat(*stats) }

  @queue.enq promise
  stat :queued, item_id: promise.object_id
  start_thread
  promise
end

#gather_historyObject

Enable the gathering of history events.

[View source]

68
69
70
# File 'lib/rake/thread_pool.rb', line 68

def gather_history          #:nodoc:
  @history_start_time = Time.now if @history_start_time.nil?
end

#historyObject

Return a array of history events for the thread pool.

History gathering must be enabled to be able to see the events (see #gather_history). Best to call this when the job is complete (i.e. after ThreadPool#join is called).

[View source]

77
78
79
80
81
# File 'lib/rake/thread_pool.rb', line 77

def history                 # :nodoc:
  @history_mon.synchronize { @history.dup }.
    sort_by { |i| i[:time] }.
    each { |i| i[:time] -= @history_start_time }
end

#joinObject

Waits until the queue of futures is empty and all threads have exited.

[View source]

44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rake/thread_pool.rb', line 44

def join
  @threads_mon.synchronize do
    begin
      stat :joining
      @join_cond.wait unless @threads.empty?
      stat :joined
    rescue Exception => e
      stat :joined
      $stderr.puts e
      $stderr.print "Queue contains #{@queue.size} items. " +
        "Thread pool contains #{@threads.count} threads\n"
      $stderr.print "Current Thread #{Thread.current} status = " +
        "#{Thread.current.status}\n"
      $stderr.puts e.backtrace.join("\n")
      @threads.each do |t|
        $stderr.print "Thread #{t} status = #{t.status}\n"
        $stderr.puts t.backtrace.join("\n")
      end
      raise e
    end
  end
end

#statisticsObject

Return a hash of always collected statistics for the thread pool.

[View source]

84
85
86
87
88
89
# File 'lib/rake/thread_pool.rb', line 84

def statistics              #  :nodoc:
  {
    total_threads_in_play: @total_threads_in_play,
    max_active_threads: @max_active_threads,
  }
end