Class: HybridPlatformsConductor::ActionsExecutor

Inherits:
Object
  • Object
show all
Includes:
LoggerHelpers
Defined in:
lib/hybrid_platforms_conductor/actions_executor.rb

Overview

Gives ways to execute actions on the nodes

Defined Under Namespace

Classes: ConnectionError

Constant Summary

Constants included from LoggerHelpers

LoggerHelpers::LEVELS_MODIFIERS, LoggerHelpers::LEVELS_TO_STDERR

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from LoggerHelpers

#err, #init_loggers, #log_component=, #log_debug?, #log_level=, #out, #section, #set_loggers_format, #stderr_device, #stderr_device=, #stderr_displayed?, #stdout_device, #stdout_device=, #stdout_displayed?, #stdouts_to_s, #with_progress_bar

Constructor Details

#initialize(logger: Logger.new(STDOUT), logger_stderr: Logger.new(STDERR), config: Config.new, cmd_runner: CmdRunner.new, nodes_handler: NodesHandler.new) ⇒ ActionsExecutor

Constructor

Parameters
  • logger (Logger): Logger to be used [default = Logger.new(STDOUT)]

  • logger_stderr (Logger): Logger to be used for stderr [default = Logger.new(STDERR)]

  • config (Config): Config to be used. [default = Config.new]

  • cmd_runner (CmdRunner): Command runner to be used. [default = CmdRunner.new]

  • nodes_handler (NodesHandler): Nodes handler to be used. [default = NodesHandler.new]



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 38

def initialize(logger: Logger.new(STDOUT), logger_stderr: Logger.new(STDERR), config: Config.new, cmd_runner: CmdRunner.new, nodes_handler: NodesHandler.new)
  init_loggers(logger, logger_stderr)
  @config = config
  @cmd_runner = cmd_runner
  @nodes_handler = nodes_handler
  # Default values
  @max_threads = 16
  @action_plugins = Plugins.new(:action, logger: @logger, logger_stderr: @logger_stderr)
  @connector_plugins = Plugins.new(
    :connector,
    logger: @logger,
    logger_stderr: @logger_stderr,
    init_plugin: proc do |plugin_class|
      plugin_class.new(
        logger: @logger,
        logger_stderr: @logger_stderr,
        config: @config,
        cmd_runner: @cmd_runner,
        nodes_handler: @nodes_handler
      )
    end
  )
end

Instance Attribute Details

#max_threadsObject

Maximum number of threads to spawn in parallel [default: 8]

Integer


28
29
30
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 28

def max_threads
  @max_threads
end

Instance Method Details

#connector(connector_name) ⇒ Object

Get a given connector

Parameters
  • connector_name (Symbol): The connector name

Result
  • Connector or nil: The connector, or nil if none found



246
247
248
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 246

def connector(connector_name)
  @connector_plugins[connector_name]
end

#execute_actions(actions_per_nodes, timeout: nil, concurrent: false, log_to_dir: "#{@config.hybrid_platforms_dir}/run_logs", log_to_stdout: true, progress_name: 'Executing actions') ⇒ Object

Execute actions on nodes.

Parameters
  • actions_per_nodes (Hash<Object, Hash<Symbol,Object> or Array< Hash<Symbol,Object> >): Actions (as a Hash of actions or a list of Hash), per nodes selector. See NodesHandler#select_nodes for details about possible nodes selectors. See each action’s setup in actions directory to know about the possible action types and data.

  • timeout (Integer): Timeout in seconds, or nil if none. [default: nil]

  • concurrent (Boolean): Do we run the commands in parallel? If yes, then stdout of commands is stored in log files. [default: false]

  • log_to_dir (String or nil): Directory name to store log files. Can be nil to not store log files. [default: “#HybridPlatformsConductor::ActionsExecutor.@[email protected]_platforms_dir/run_logs”]

  • log_to_stdout (Boolean): Do we log the command result on stdout? [default: true]

  • progress_name (String): Name to display on the progress bar [default: ‘Executing actions’]

Result
  • Hash<String, [Integer or Symbol, String, String]>: Exit status code (or Symbol in case of error or dry run), standard output and error for each node.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 105

def execute_actions(
  actions_per_nodes,
  timeout: nil,
  concurrent: false,
  log_to_dir: "#{@config.hybrid_platforms_dir}/run_logs",
  log_to_stdout: true,
  progress_name: 'Executing actions'
)
  # Keep a list of nodes that will need remote access
  nodes_needing_connectors = []
  # Compute the ordered list of actions per selected node
  # Hash< String, Array< [Symbol,      Object     ]> >
  # Hash< node,   Array< [action_type, action_data]> >
  actions_per_node = {}
  actions_per_nodes.each do |nodes_selector, nodes_actions|
    # Resolved actions, as Action objects
    resolved_nodes_actions = []
    need_remote = false
    (nodes_actions.is_a?(Array) ? nodes_actions : [nodes_actions]).each do |nodes_actions_set|
      nodes_actions_set.each do |action_type, action_info|
        raise 'Cannot have concurrent executions for interactive sessions' if concurrent && action_type == :interactive && action_info
        raise "Unknown action type #{action_type}" unless @action_plugins.key?(action_type)
        action = @action_plugins[action_type].new(
          logger: @logger,
          logger_stderr: @logger_stderr,
          config: @config,
          cmd_runner: @cmd_runner,
          actions_executor: self,
          action_info: action_info
        )
        need_remote = true if action.need_connector?
        resolved_nodes_actions << action
      end
    end
    # Resolve nodes
    resolved_nodes = @nodes_handler.select_nodes(nodes_selector)
    nodes_needing_connectors.concat(resolved_nodes) if need_remote
    resolved_nodes.each do |node|
      actions_per_node[node] = [] unless actions_per_node.key?(node)
      actions_per_node[node].concat(resolved_nodes_actions)
    end
  end
  result = Hash[actions_per_node.keys.map { |node| [node, nil] }]
  with_connections_prepared_to(nodes_needing_connectors, no_exception: true) do |connected_nodes|
    missing_nodes = []
    connected_nodes.each do |node, connector|
      if connector.is_a?(Symbol)
        result[node] = [connector, '', "Unable to get a connector to #{node}"]
        missing_nodes << node
      end
    end
    accessible_nodes = actions_per_node.keys - missing_nodes
    log_debug "Running actions on #{accessible_nodes.size} nodes#{log_to_dir.nil? ? '' : " (logs dumped in #{log_to_dir})"}"
    # Prepare the result (stdout or nil per node)
    unless accessible_nodes.empty?
      # If we run in parallel then clone the connectors, so that each node has its own instance for thread-safe code.
      connected_nodes = Hash[connected_nodes.map { |node, connector| [node, connector.clone] }] if concurrent
      @nodes_handler.for_each_node_in(
        accessible_nodes,
        parallel: concurrent,
        nbr_threads_max: @max_threads,
        progress: progress_name
      ) do |node|
        node_actions = actions_per_node[node]
        # If we run in parallel then clone the actions, so that each node has its own instance for thread-safe code.
        node_actions.map!(&:clone) if concurrent
        result[node] = execute_actions_on(
          node,
          node_actions,
          connected_nodes[node],
          timeout: timeout,
          log_to_file: log_to_dir.nil? ? nil : "#{log_to_dir}/#{node}.stdout",
          log_to_stdout: log_to_stdout
        )
      end
    end
  end
  result
end

#options_parse(options_parser, parallel: true) ⇒ Object

Complete an option parser with options meant to control this Actions Executor

Parameters
  • options_parser (OptionParser): The option parser to complete

  • parallel (Boolean): Do we activate options regarding parallel execution? [default = true]



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 67

def options_parse(options_parser, parallel: true)
  if parallel
    options_parser.separator ''
    options_parser.separator 'Actions Executor options:'
    options_parser.on('-m', '--max-threads NBR', "Set the number of threads to use for concurrent queries (defaults to #{@max_threads})") do |nbr_threads|
      @max_threads = nbr_threads.to_i
    end
  end
  # Display options connectors might have
  @connector_plugins.each do |connector_name, connector|
    if connector.respond_to?(:options_parse)
      options_parser.separator ''
      options_parser.separator "Connector #{connector_name} options:"
      connector.options_parse(options_parser)
    end
  end
end

#validate_paramsObject

Validate that parsed parameters are valid



86
87
88
89
90
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 86

def validate_params
  @connector_plugins.values.each do |connector|
    connector.validate_params if connector.respond_to?(:validate_params)
  end
end

#with_connections_prepared_to(nodes, no_exception: false) ⇒ Object

Prepare connections to a set of nodes

Parameters
  • nodes (Array<String>): List of nodes to connect to

  • no_exception (Boolean): Should we continue even if some nodes can’t be connected to? [default: false]

  • Proc: Code called with connections prepared

    • Parameters
      • connected_nodes (Hash<String, Connector or Symbol>): Prepared connectors (or Symbol in case of failure with no_exception), per node name



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 193

def with_connections_prepared_to(nodes, no_exception: false)
  # Make sure every node needing connectors finds a connector
  nodes_needing_connectors = Hash[nodes.map { |node| [node, nil] }]
  @connector_plugins.each do |connector_name, connector|
    nodes_without_connectors = nodes_needing_connectors.select { |_node, selected_connector| selected_connector.nil? }.keys
    break if nodes_without_connectors.empty?
    (connector.connectable_nodes_from(nodes_without_connectors) & nodes_without_connectors).each do |node|
      nodes_needing_connectors[node] = connector if nodes_needing_connectors[node].nil?
    end
  end
  # If some nodes need connectors but can't find any, then fail
  nodes_without_connectors = nodes_needing_connectors.select { |_node, selected_connector| selected_connector.nil? }.keys
  unless nodes_without_connectors.empty?
    message = "The following nodes have no possible connector to them: #{nodes_without_connectors.sort.join(', ')}"
    log_warn message
    raise message unless no_exception
  end
  # Prepare the connectors to operate on the nodes they have been assigned to
  preparation_code = proc do |remaining_plugins_to_prepare|
    connector_name = remaining_plugins_to_prepare.first
    if connector_name.nil?
      # All plugins have been prepared.
      # Call our client code.
      yield Hash[nodes_needing_connectors.map do |node, selected_connector|
        [
          node,
          selected_connector.nil? ? :no_connector : selected_connector
        ]
      end]
    else
      connector = @connector_plugins[connector_name]
      selected_nodes = nodes_needing_connectors.select { |_node, selected_connector| selected_connector == connector }.keys
      if selected_nodes.empty?
        preparation_code.call(remaining_plugins_to_prepare[1..-1])
      else
        connector.with_connection_to(selected_nodes, no_exception: no_exception) do |connected_nodes|
          (selected_nodes - connected_nodes).each do |node_in_error|
            nodes_needing_connectors[node_in_error] = :connection_error
          end
          preparation_code.call(remaining_plugins_to_prepare[1..-1])
        end
      end
    end
  end
  preparation_code.call(@connector_plugins.select { |_connector_name, connector| connector.respond_to?(:with_connection_to) }.keys)
end