Class: Bolt::FiberExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/bolt/fiber_executor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeFiberExecutor

Returns a new instance of FiberExecutor.



10
11
12
13
14
15
# File 'lib/bolt/fiber_executor.rb', line 10

def initialize
  @logger = Bolt::Logger.logger(self)
  @id = 0
  @active_futures = []
  @finished_futures = []
end

Instance Attribute Details

#active_futuresObject (readonly)

Returns the value of attribute active_futures.



8
9
10
# File 'lib/bolt/fiber_executor.rb', line 8

def active_futures
  @active_futures
end

#finished_futuresObject (readonly)

Returns the value of attribute finished_futures.



8
9
10
# File 'lib/bolt/fiber_executor.rb', line 8

def finished_futures
  @finished_futures
end

Instance Method Details

#all_futuresObject



115
116
117
# File 'lib/bolt/fiber_executor.rb', line 115

def all_futures
  active_futures + finished_futures
end

#create_future(plan_id:, scope: nil, name: nil) ⇒ Object

Creates a new Puppet scope from the current Plan scope so that variables can be used inside the block and won’t interact with the outer scope. Then creates a new Fiber to execute the block, wraps the Fiber in a Bolt::PlanFuture, and returns the Bolt::PlanFuture.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/bolt/fiber_executor.rb', line 28

def create_future(plan_id:, scope: nil, name: nil)
  newscope = nil
  if scope
    # Save existing variables to the new scope before starting the future
    # itself so that if the plan returns before the backgrounded block
    # starts, we still have the variables.
    newscope = Puppet::Parser::Scope.new(scope.compiler)
    local = Puppet::Parser::Scope::LocalScope.new

    # Compress the current scopes into a single vars hash to add to the new scope
    scope.to_hash(true, true).each_pair { |k, v| local[k] = v }
    newscope.push_ephemerals([local])
  end

  # Create a new Fiber that will execute the provided block.
  future = Fiber.new do
    # Yield the new scope - this should be ignored by the block if
    # `newscope` is nil.
    yield newscope
  end

  # PlanFutures are assigned an ID, which is just a global incrementing
  # integer. The main plan should always have ID 0. They also have a
  # plan_id, which identifies which plan spawned them. This is used for
  # tracking which Futures to wait on when `wait()` is called without
  # arguments.
  @id += 1
  future = Bolt::PlanFuture.new(future, @id, name: name, plan_id: plan_id, scope: newscope)
  @logger.trace("Created future #{future.name}")

  # Register the PlanFuture with the FiberExecutor to be executed
  active_futures << future
  future
end

#get_current_future(fiber:) ⇒ Object

Get the PlanFuture object that is currently executing



121
122
123
# File 'lib/bolt/fiber_executor.rb', line 121

def get_current_future(fiber:)
  all_futures.select { |f| f.fiber == fiber }.first
end

#get_current_plan_id(fiber:) ⇒ Object

Get the plan invocation ID for the PlanFuture that is currently executing



127
128
129
# File 'lib/bolt/fiber_executor.rb', line 127

def get_current_plan_id(fiber:)
  get_current_future(fiber: fiber).current_plan
end

#get_futures_for_plan(plan_id:) ⇒ Object

Get the Future objects associated with a particular plan invocation.



133
134
135
# File 'lib/bolt/fiber_executor.rb', line 133

def get_futures_for_plan(plan_id:)
  all_futures.select { |f| f.original_plan == plan_id }
end

#in_parallel?Boolean

Whether there is more than one fiber running in parallel.

Returns:

  • (Boolean)


19
20
21
# File 'lib/bolt/fiber_executor.rb', line 19

def in_parallel?
  active_futures.length > 1
end

#plan_complete?Boolean

Whether all PlanFutures have finished executing, indicating that the entire plan (main plan and any PlanFutures it spawned) has finished and Bolt can exit.

Returns:

  • (Boolean)


111
112
113
# File 'lib/bolt/fiber_executor.rb', line 111

def plan_complete?
  active_futures.empty?
end

#round_robinObject

Visit each PlanFuture registered with the FiberExecutor and resume it. Fibers will yield themselves back, either if they kicked off a long-running process or if the current long-running process hasn’t completed. If the Fiber finishes after being resumed, store the result in the PlanFuture and remove the PlanFuture from the FiberExecutor.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/bolt/fiber_executor.rb', line 69

def round_robin
  active_futures.each do |future|
    # If the Fiber is still running and can be resumed, then resume it.
    # Override Puppet's global_scope to prevent ephemerals in other scopes
    # from being popped off in the wrong order due to race conditions.
    # This primarily happens when running executor functions from custom
    # Puppet language functions, but may happen elsewhere.
    @logger.trace("Checking future '#{future.name}'")
    if future.alive?
      @logger.trace("Resuming future '#{future.name}'")
      Puppet.override(global_scope: future.scope) { future.resume }
    end

    # Once we've restarted the Fiber, check to see if it's finished again
    # and cleanup if it has.
    next if future.alive?
    @logger.trace("Cleaning up future '#{future.name}'")

    # If the future errored and the main plan has already exited, log the
    # error at warn level.
    unless active_futures.map(&:id).include?(0) || future.state == "done"
      Bolt::Logger.warn('errored_futures', "Error in future '#{future.name}': #{future.value}")
    end

    # Remove the PlanFuture from the FiberExecutor.
    finished_futures.push(active_futures.delete(future))
  end

  # If the Fiber immediately returned or if the Fiber is blocking on a
  # `wait` call, Bolt should pause for long enough that something can
  # execute before checking again. This mitigates CPU
  # thrashing.
  return unless active_futures.all? { |f| %i[returned_immediately unfinished].include?(f.value) }
  @logger.trace("Nothing can be resumed. Rechecking in 0.5 seconds.")

  sleep(0.5)
end

#wait(futures, timeout: nil, catch_errors: false, **_kwargs) ⇒ Object

Block until the provided PlanFuture objects have finished, or the timeout is reached.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/bolt/fiber_executor.rb', line 139

def wait(futures, timeout: nil, catch_errors: false, **_kwargs)
  if futures.nil?
    results = []
    plan_id = get_current_plan_id(fiber: Fiber.current)
    # Recollect the futures for this plan until all of the futures have
    # finished. This ensures that we include futures created inside of
    # futures being waited on.
    until (futures = get_futures_for_plan(plan_id: plan_id)).map(&:alive?).none?
      if futures.map(&:fiber).include?(Fiber.current)
        msg = "The wait() function cannot be called with no arguments inside a "\
              "background block in the same plan."
        raise Bolt::Error.new(msg, 'bolt/infinite-wait')
      end
      # Wait for all the futures we know about so far before recollecting
      # Futures for the plan and waiting again
      results = wait(futures, timeout: timeout, catch_errors: catch_errors)
    end
    return results
  end

  if timeout.nil?
    Fiber.yield(:unfinished) until futures.map(&:alive?).none?
  else
    start = Time.now
    Fiber.yield(:unfinished) until (Time.now - start > timeout) || futures.map(&:alive?).none?
    # Raise an error for any futures that are still alive
    futures.each do |f|
      if f.alive?
        f.raise(Bolt::FutureTimeoutError.new(f.name, timeout))
      end
    end
  end

  results = futures.map(&:value)

  failed_indices = results.each_index.select do |i|
    results[i].is_a?(Bolt::Error)
  end

  if failed_indices.any?
    if catch_errors
      failed_indices.each { |i| results[i] = results[i].to_puppet_error }
    else
      # Do this after handling errors for simplicity and pretty printing
      raise Bolt::ParallelFailure.new(results, failed_indices)
    end
  end

  results
end