Class: Chef::ChefFS::Parallelizer::ParallelEnumerable
- Inherits:
-
Object
- Object
- Chef::ChefFS::Parallelizer::ParallelEnumerable
- Includes:
- Enumerable
- Defined in:
- lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
Defined Under Namespace
Classes: RestrictedLazy
Instance Attribute Summary collapse
-
#block ⇒ Object
readonly
Returns the value of attribute block.
-
#input_enumerable ⇒ Object
readonly
Returns the value of attribute input_enumerable.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#parent_task_queue ⇒ Object
readonly
Returns the value of attribute parent_task_queue.
Instance Method Summary collapse
- #count(*args, &block) ⇒ Object
- #drop(n) ⇒ Object
- #each ⇒ Object
- #each_with_exceptions(&block) ⇒ Object
- #each_with_index ⇒ Object
- #each_with_input ⇒ Object
- #first(n = nil) ⇒ Object
- #flatten(levels = nil) ⇒ Object
-
#initialize(parent_task_queue, input_enumerable, options = {}, &block) ⇒ ParallelEnumerable
constructor
options: :ordered [true|false] - whether the output should stay in the same order as the input (even though it may not actually be processed in that order).
- #lazy ⇒ Object
- #original_count ⇒ Object
- #original_lazy ⇒ Object
-
#restricted_copy(enumerable) ⇒ Object
Enumerable methods.
- #take(n) ⇒ Object
- #wait ⇒ Object
Constructor Details
#initialize(parent_task_queue, input_enumerable, options = {}, &block) ⇒ ParallelEnumerable
options: :ordered [true|false] - whether the output should stay in the same order
as the input (even though it may not actually be processed in that
order). Default: true
:stop_on_exception [true|false] - if true, when an exception occurs in either
input or output, we wait for any outstanding processing to complete,
but will not process any new inputs. Default: false
:main_thread_processing [true|false] - whether the main thread pulling
on each() is allowed to process inputs. Default: true
NOTE: If you set this to false, parallelizer.kill will stop each()
in its tracks, so you need to know for sure that won't happen.
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 20 def initialize(parent_task_queue, input_enumerable, = {}, &block) @parent_task_queue = parent_task_queue @input_enumerable = input_enumerable @options = @block = block @unconsumed_input = Queue.new @in_process = {} @unconsumed_output = Queue.new end |
Instance Attribute Details
#block ⇒ Object (readonly)
Returns the value of attribute block.
34 35 36 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 34 def block @block end |
#input_enumerable ⇒ Object (readonly)
Returns the value of attribute input_enumerable.
32 33 34 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 32 def input_enumerable @input_enumerable end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
33 34 35 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 33 def @options end |
#parent_task_queue ⇒ Object (readonly)
Returns the value of attribute parent_task_queue.
31 32 33 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 31 def parent_task_queue @parent_task_queue end |
Instance Method Details
#count(*args, &block) ⇒ Object
87 88 89 90 91 92 93 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 87 def count(*args, &block) if args.size == 0 && block.nil? @input_enumerable.count else original_count(*args, &block) end end |
#drop(n) ⇒ Object
103 104 105 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 103 def drop(n) restricted_copy(@input_enumerable.drop(n)).to_a end |
#each ⇒ Object
36 37 38 39 40 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 36 def each each_with_input do |output, index, input, type| yield output end end |
#each_with_exceptions(&block) ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 64 def each_with_exceptions(&block) if @options[:ordered] == false each_with_exceptions_unordered(&block) else each_with_exceptions_ordered(&block) end end |
#each_with_index ⇒ Object
42 43 44 45 46 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 42 def each_with_index each_with_input do |output, index, input| yield output, index end end |
#each_with_input ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 48 def each_with_input exception = nil each_with_exceptions do |output, index, input, type| if type == :exception if @options[:ordered] == false exception ||= output else raise output end else yield output, index, input end end raise exception if exception end |
#first(n = nil) ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 95 def first(n = nil) if n restricted_copy(@input_enumerable.first(n)).to_a else first(1)[0] end end |
#flatten(levels = nil) ⇒ Object
107 108 109 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 107 def flatten(levels = nil) FlattenEnumerable.new(self, levels) end |
#lazy ⇒ Object
139 140 141 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 139 def lazy RestrictedLazy.new(self, original_lazy) end |
#original_count ⇒ Object
85 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 85 alias :original_count :count |
#original_lazy ⇒ Object
137 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 137 alias :original_lazy :lazy |
#restricted_copy(enumerable) ⇒ Object
Enumerable methods
81 82 83 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 81 def restricted_copy(enumerable) ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block) end |
#take(n) ⇒ Object
111 112 113 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 111 def take(n) restricted_copy(@input_enumerable.take(n)).to_a end |
#wait ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 72 def wait exception = nil each_with_exceptions_unordered do |output, index, input, type| exception ||= output if type == :exception end raise exception if exception end |