Class: Chef::ChefFS::Parallelizer::ParallelEnumerable

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/chef/chef_fs/parallelizer/parallel_enumerable.rb

Defined Under Namespace

Classes: RestrictedLazy

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parent_task_queue, input_enumerable, options = {}, &block) ⇒ ParallelEnumerable

options: :ordered [true|false] - whether the output should stay in the same order as the input (even though it may not actually be processed in that order). Default: true :stop_on_exception [true|false] - if true, when an exception occurs in either input or output, we wait for any outstanding processing to complete, but will not process any new inputs. Default: false :main_thread_processing [true|false] - whether the main thread pulling on each() is allowed to process inputs. Default: true NOTE: If you set this to false, parallelizer.kill will stop each() in its tracks, so you need to know for sure that won't happen.


20
21
22
23
24
25
26
27
28
29
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 20

def initialize(parent_task_queue, input_enumerable, options = {}, &block)
  @parent_task_queue = parent_task_queue
  @input_enumerable = input_enumerable
  @options = options
  @block = block

  @unconsumed_input = Queue.new
  @in_process = {}
  @unconsumed_output = Queue.new
end

Instance Attribute Details

#blockObject (readonly)

Returns the value of attribute block


34
35
36
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 34

def block
  @block
end

#input_enumerableObject (readonly)

Returns the value of attribute input_enumerable


32
33
34
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 32

def input_enumerable
  @input_enumerable
end

#optionsObject (readonly)

Returns the value of attribute options


33
34
35
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 33

def options
  @options
end

#parent_task_queueObject (readonly)

Returns the value of attribute parent_task_queue


31
32
33
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 31

def parent_task_queue
  @parent_task_queue
end

Instance Method Details

#count(*args, &block) ⇒ Object


87
88
89
90
91
92
93
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 87

def count(*args, &block)
  if args.size == 0 && block.nil?
    @input_enumerable.count
  else
    original_count(*args, &block)
  end
end

#drop(n) ⇒ Object


103
104
105
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 103

def drop(n)
  restricted_copy(@input_enumerable.drop(n)).to_a
end

#eachObject


36
37
38
39
40
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 36

def each
  each_with_input do |output, index, input, type|
    yield output
  end
end

#each_with_exceptions(&block) ⇒ Object


64
65
66
67
68
69
70
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 64

def each_with_exceptions(&block)
  if @options[:ordered] == false
    each_with_exceptions_unordered(&block)
  else
    each_with_exceptions_ordered(&block)
  end
end

#each_with_indexObject


42
43
44
45
46
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 42

def each_with_index
  each_with_input do |output, index, input|
    yield output, index
  end
end

#each_with_inputObject


48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 48

def each_with_input
  exception = nil
  each_with_exceptions do |output, index, input, type|
    if type == :exception
      if @options[:ordered] == false
        exception ||= output
      else
        raise output
      end
    else
      yield output, index, input
    end
  end
  raise exception if exception
end

#first(n = nil) ⇒ Object


95
96
97
98
99
100
101
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 95

def first(n = nil)
  if n
    restricted_copy(@input_enumerable.first(n)).to_a
  else
    first(1)[0]
  end
end

#flatten(levels = nil) ⇒ Object


107
108
109
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 107

def flatten(levels = nil)
  FlattenEnumerable.new(self, levels)
end

#lazyObject


139
140
141
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 139

def lazy
  RestrictedLazy.new(self, original_lazy)
end

#original_countObject


85
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 85

alias :original_count :count

#original_lazyObject


137
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 137

alias :original_lazy :lazy

#restricted_copy(enumerable) ⇒ Object

Enumerable methods


81
82
83
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 81

def restricted_copy(enumerable)
  ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block)
end

#take(n) ⇒ Object


111
112
113
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 111

def take(n)
  restricted_copy(@input_enumerable.take(n)).to_a
end

#waitObject


72
73
74
75
76
77
78
# File 'lib/chef/chef_fs/parallelizer/parallel_enumerable.rb', line 72

def wait
  exception = nil
  each_with_exceptions_unordered do |output, index, input, type|
    exception ||= output if type == :exception
  end
  raise exception if exception
end