Class: ThreadLimiter

Inherits:
Object
  • Object
show all
Defined in:
lib/threadlimiter/threadlimiter.rb,
lib/threadlimiter/version.rb

Overview

Fork threads like Thread.fork, but limit the number of concurrently running threads.

ThreadLimiter isn’t a thread pool. Each fork really starts a new thread.

Constant Summary collapse

VERSION =
"0.2.1"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit, options = {}) ⇒ ThreadLimiter

Initialize the ThreadLimiter. The optional parameter limit is the maximum number of concurrently running threads. Set limit to -1 to fork threads without limiting the number of concurrently running threads. Set limit to 0 to join the newly created thread immediately, mimicking no-forking. Set options to true to start the new thread before waiting for resources.



25
26
27
28
29
30
31
32
# File 'lib/threadlimiter/threadlimiter.rb', line 25

def initialize(limit, options={})
  @limit	= limit	# The maximum number of concurrently running threads.
  @running	= 0	# The number of currently running threads.
  @noblock	= options[:noblock]

  @mutex	= Mutex.new
  @cv		= ConditionVariable.new
end

Class Method Details

.handle_clusters(enumeration, number_of_clusters, method_name, &block) ⇒ Object

:nodoc:



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/threadlimiter/threadlimiter.rb', line 64

def self.handle_clusters(enumeration, number_of_clusters, method_name, &block)	# :nodoc:
  clusters	= []	# One cluster per fork.
  last_pos	= -1
  res		= []

  enumeration.each do |object|
    last_pos += 1

    (clusters[last_pos%number_of_clusters] ||= []) << object
  end

  clusters.__send__(method_name, -1) do |cluster|
    cluster.collect do |object|
      if block.arity > 1 and object.kind_of?(Enumerable)
        yield(*object.to_a)
      else
        yield(object)
      end
    end
  end.collect do |cluster|
    cluster + (cluster.length == clusters[0].length ? [] : [nil])	# Add padding nil, in order to be able to transpose
  end.transpose.each do |array|
    res.concat(array)
  end

  res[0..last_pos]	# Remove padding nil.
end

.open(*args) ⇒ Object

Create and use a new ThreadLimiter and wait for all threads to finish.



9
10
11
12
13
14
15
16
17
# File 'lib/threadlimiter/threadlimiter.rb', line 9

def self.open(*args)
  thread_limiter	= new(*args)

  begin
    yield(thread_limiter)
  ensure
    thread_limiter.wait
  end
end

Instance Method Details

#fork(*args, &block) ⇒ Object

Fork a thread. The given block is run within the thread. It behaves like Thread.fork(). In fact, it invokes Thread.fork() and returns its result. The list of arguments is passed to Thread.fork().



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/threadlimiter/threadlimiter.rb', line 40

def fork(*args, &block)
  if @limit < 0
    Thread.fork do
      yield(*args)
    end
  elsif @limit == 0
    Thread.fork do
      yield(*args)
    end.join  # Because ThreadLimiter#fork() should return a new Thread.
  else
    cv_wait	unless @noblock

    Thread.fork do
      cv_wait	if @noblock

      begin
        yield(*args)
      ensure
        cv_signal
      end
    end
  end
end

#waitObject

Wait for all threads to finish.



94
95
96
97
98
99
100
101
102
# File 'lib/threadlimiter/threadlimiter.rb', line 94

def wait
  @mutex.synchronize do
    while @running > 0
      @cv.wait(@mutex)
    end
  end

  self
end