Class: Chef::ChefFS::Parallelizer
- Inherits:
-
Object
- Object
- Chef::ChefFS::Parallelizer
show all
- Defined in:
- lib/chef/chef_fs/parallelizer.rb,
lib/chef/chef_fs/parallelizer/flatten_enumerable.rb,
lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
Overview
Tries to balance several guarantees, in order of priority:
-
don’t get deadlocked
-
provide results in desired order
-
provide results as soon as they are available
-
process input as soon as possible
Defined Under Namespace
Classes: FlattenEnumerable, ParallelEnumerable
Constant Summary
collapse
- @@parallelizer =
nil
- @@threads =
0
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(num_threads) ⇒ Parallelizer
Returns a new instance of Parallelizer.
32
33
34
35
36
37
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 32
def initialize(num_threads)
@tasks = Queue.new
@threads = []
@stop_thread = {}
resize(num_threads)
end
|
Class Method Details
.parallel_do(enumerable, options = {}, &block) ⇒ Object
28
29
30
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 28
def self.parallel_do(enumerable, options = {}, &block)
parallelizer.parallel_do(enumerable, options, &block)
end
|
.parallelize(enumerable, options = {}, &block) ⇒ Object
24
25
26
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 24
def self.parallelize(enumerable, options = {}, &block)
parallelizer.parallelize(enumerable, options, &block)
end
|
.parallelizer ⇒ Object
20
21
22
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 20
def self.parallelizer
@@parallelizer ||= Parallelizer.new(@@threads)
end
|
.threads=(value) ⇒ Object
15
16
17
18
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 15
def self.threads=(value)
@@threads = value
@@parallelizer.resize(value) if @@parallelizer
end
|
Instance Method Details
#kill ⇒ Object
78
79
80
81
82
83
84
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 78
def kill
@threads.each do |thread|
Thread.kill(thread)
@stop_thread.delete(thread)
end
@threads = []
end
|
#num_threads ⇒ Object
39
40
41
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 39
def num_threads
@threads.size
end
|
#parallel_do(enumerable, options = {}, &block) ⇒ Object
47
48
49
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 47
def parallel_do(enumerable, options = {}, &block)
ParallelEnumerable.new(@tasks, enumerable, options.merge(ordered: false), &block).wait
end
|
#parallelize(enumerable, options = {}, &block) ⇒ Object
43
44
45
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 43
def parallelize(enumerable, options = {}, &block)
ParallelEnumerable.new(@tasks, enumerable, options, &block)
end
|
#resize(to_threads, wait = true, timeout = nil) ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 55
def resize(to_threads, wait = true, timeout = nil)
if to_threads < num_threads
threads_to_stop = @threads[to_threads..num_threads - 1]
@threads = @threads.slice(0, to_threads)
threads_to_stop.each do |thread|
@stop_thread[thread] = true
end
if wait
start_time = Time.now
threads_to_stop.each do |thread|
thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
thread.join(thread_timeout)
end
end
else
num_threads.upto(to_threads - 1) do |i|
@threads[i] = Thread.new(&method(:worker_loop))
end
end
end
|
#stop(wait = true, timeout = nil) ⇒ Object
51
52
53
|
# File 'lib/chef/chef_fs/parallelizer.rb', line 51
def stop(wait = true, timeout = nil)
resize(0, wait, timeout)
end
|