Class: HybridPlatformsConductor::ActionsExecutor
- Inherits:
-
Object
- Object
- HybridPlatformsConductor::ActionsExecutor
- Includes:
- LoggerHelpers
- Defined in:
- lib/hybrid_platforms_conductor/actions_executor.rb
Overview
Gives ways to execute actions on the nodes
Defined Under Namespace
Classes: ConnectionError
Constant Summary
Constants included from LoggerHelpers
LoggerHelpers::LEVELS_MODIFIERS, LoggerHelpers::LEVELS_TO_STDERR
Instance Attribute Summary collapse
-
#max_threads ⇒ Object
Maximum number of threads to spawn in parallel [default: 8] Integer.
Instance Method Summary collapse
-
#connector(connector_name) ⇒ Object
Get a given connector.
-
#execute_actions(actions_per_nodes, timeout: nil, concurrent: false, log_to_dir: "#{@config.hybrid_platforms_dir}/run_logs", log_to_stdout: true, progress_name: 'Executing actions') ⇒ Object
Execute actions on nodes.
-
#initialize(logger: Logger.new(STDOUT), logger_stderr: Logger.new(STDERR), config: Config.new, cmd_runner: CmdRunner.new, nodes_handler: NodesHandler.new) ⇒ ActionsExecutor
constructor
Constructor.
-
#options_parse(options_parser, parallel: true) ⇒ Object
Complete an option parser with options meant to control this Actions Executor.
-
#validate_params ⇒ Object
Validate that parsed parameters are valid.
-
#with_connections_prepared_to(nodes, no_exception: false) ⇒ Object
Prepare connections to a set of nodes.
Methods included from LoggerHelpers
#err, #init_loggers, #log_component=, #log_debug?, #log_level=, #out, #section, #set_loggers_format, #stderr_device, #stderr_device=, #stderr_displayed?, #stdout_device, #stdout_device=, #stdout_displayed?, #stdouts_to_s, #with_progress_bar
Constructor Details
#initialize(logger: Logger.new(STDOUT), logger_stderr: Logger.new(STDERR), config: Config.new, cmd_runner: CmdRunner.new, nodes_handler: NodesHandler.new) ⇒ ActionsExecutor
Constructor
- Parameters
-
logger (Logger): Logger to be used [default = Logger.new(STDOUT)]
-
logger_stderr (Logger): Logger to be used for stderr [default = Logger.new(STDERR)]
-
config (Config): Config to be used. [default = Config.new]
-
cmd_runner (CmdRunner): Command runner to be used. [default = CmdRunner.new]
-
nodes_handler (NodesHandler): Nodes handler to be used. [default = NodesHandler.new]
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 38 def initialize(logger: Logger.new(STDOUT), logger_stderr: Logger.new(STDERR), config: Config.new, cmd_runner: CmdRunner.new, nodes_handler: NodesHandler.new) init_loggers(logger, logger_stderr) @config = config @cmd_runner = cmd_runner @nodes_handler = nodes_handler # Default values @max_threads = 16 @action_plugins = Plugins.new(:action, logger: @logger, logger_stderr: @logger_stderr) @connector_plugins = Plugins.new( :connector, logger: @logger, logger_stderr: @logger_stderr, init_plugin: proc do |plugin_class| plugin_class.new( logger: @logger, logger_stderr: @logger_stderr, config: @config, cmd_runner: @cmd_runner, nodes_handler: @nodes_handler ) end ) end |
Instance Attribute Details
#max_threads ⇒ Object
Maximum number of threads to spawn in parallel [default: 8]
Integer
28 29 30 |
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 28 def max_threads @max_threads end |
Instance Method Details
#connector(connector_name) ⇒ Object
Get a given connector
- Parameters
-
connector_name (Symbol): The connector name
- Result
-
Connector or nil: The connector, or nil if none found
246 247 248 |
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 246 def connector(connector_name) @connector_plugins[connector_name] end |
#execute_actions(actions_per_nodes, timeout: nil, concurrent: false, log_to_dir: "#{@config.hybrid_platforms_dir}/run_logs", log_to_stdout: true, progress_name: 'Executing actions') ⇒ Object
Execute actions on nodes.
- Parameters
-
actions_per_nodes (Hash<Object, Hash<Symbol,Object> or Array< Hash<Symbol,Object> >): Actions (as a Hash of actions or a list of Hash), per nodes selector. See NodesHandler#select_nodes for details about possible nodes selectors. See each action’s setup in actions directory to know about the possible action types and data.
-
timeout (Integer): Timeout in seconds, or nil if none. [default: nil]
-
concurrent (Boolean): Do we run the commands in parallel? If yes, then stdout of commands is stored in log files. [default: false]
-
log_to_dir (String or nil): Directory name to store log files. Can be nil to not store log files. [default: “#HybridPlatformsConductor::ActionsExecutor.@[email protected]_platforms_dir/run_logs”]
-
log_to_stdout (Boolean): Do we log the command result on stdout? [default: true]
-
progress_name (String): Name to display on the progress bar [default: ‘Executing actions’]
- Result
-
Hash<String, [Integer or Symbol, String, String]>: Exit status code (or Symbol in case of error or dry run), standard output and error for each node.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 105 def execute_actions( actions_per_nodes, timeout: nil, concurrent: false, log_to_dir: "#{@config.hybrid_platforms_dir}/run_logs", log_to_stdout: true, progress_name: 'Executing actions' ) # Keep a list of nodes that will need remote access nodes_needing_connectors = [] # Compute the ordered list of actions per selected node # Hash< String, Array< [Symbol, Object ]> > # Hash< node, Array< [action_type, action_data]> > actions_per_node = {} actions_per_nodes.each do |nodes_selector, nodes_actions| # Resolved actions, as Action objects resolved_nodes_actions = [] need_remote = false (nodes_actions.is_a?(Array) ? nodes_actions : [nodes_actions]).each do |nodes_actions_set| nodes_actions_set.each do |action_type, action_info| raise 'Cannot have concurrent executions for interactive sessions' if concurrent && action_type == :interactive && action_info raise "Unknown action type #{action_type}" unless @action_plugins.key?(action_type) action = @action_plugins[action_type].new( logger: @logger, logger_stderr: @logger_stderr, config: @config, cmd_runner: @cmd_runner, actions_executor: self, action_info: action_info ) need_remote = true if action.need_connector? resolved_nodes_actions << action end end # Resolve nodes resolved_nodes = @nodes_handler.select_nodes(nodes_selector) nodes_needing_connectors.concat(resolved_nodes) if need_remote resolved_nodes.each do |node| actions_per_node[node] = [] unless actions_per_node.key?(node) actions_per_node[node].concat(resolved_nodes_actions) end end result = Hash[actions_per_node.keys.map { |node| [node, nil] }] with_connections_prepared_to(nodes_needing_connectors, no_exception: true) do |connected_nodes| missing_nodes = [] connected_nodes.each do |node, connector| if connector.is_a?(Symbol) result[node] = [connector, '', "Unable to get a connector to #{node}"] missing_nodes << node end end accessible_nodes = actions_per_node.keys - missing_nodes log_debug "Running actions on #{accessible_nodes.size} nodes#{log_to_dir.nil? ? '' : " (logs dumped in #{log_to_dir})"}" # Prepare the result (stdout or nil per node) unless accessible_nodes.empty? # If we run in parallel then clone the connectors, so that each node has its own instance for thread-safe code. connected_nodes = Hash[connected_nodes.map { |node, connector| [node, connector.clone] }] if concurrent @nodes_handler.for_each_node_in( accessible_nodes, parallel: concurrent, nbr_threads_max: @max_threads, progress: progress_name ) do |node| node_actions = actions_per_node[node] # If we run in parallel then clone the actions, so that each node has its own instance for thread-safe code. node_actions.map!(&:clone) if concurrent result[node] = execute_actions_on( node, node_actions, connected_nodes[node], timeout: timeout, log_to_file: log_to_dir.nil? ? nil : "#{log_to_dir}/#{node}.stdout", log_to_stdout: log_to_stdout ) end end end result end |
#options_parse(options_parser, parallel: true) ⇒ Object
Complete an option parser with options meant to control this Actions Executor
- Parameters
-
options_parser (OptionParser): The option parser to complete
-
parallel (Boolean): Do we activate options regarding parallel execution? [default = true]
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 67 def (, parallel: true) if parallel .separator '' .separator 'Actions Executor options:' .on('-m', '--max-threads NBR', "Set the number of threads to use for concurrent queries (defaults to #{@max_threads})") do |nbr_threads| @max_threads = nbr_threads.to_i end end # Display options connectors might have @connector_plugins.each do |connector_name, connector| if connector.respond_to?(:options_parse) .separator '' .separator "Connector #{connector_name} options:" connector.() end end end |
#validate_params ⇒ Object
Validate that parsed parameters are valid
86 87 88 89 90 |
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 86 def validate_params @connector_plugins.values.each do |connector| connector.validate_params if connector.respond_to?(:validate_params) end end |
#with_connections_prepared_to(nodes, no_exception: false) ⇒ Object
Prepare connections to a set of nodes
- Parameters
-
nodes (Array<String>): List of nodes to connect to
-
no_exception (Boolean): Should we continue even if some nodes can’t be connected to? [default: false]
-
Proc: Code called with connections prepared
- Parameters
-
connected_nodes (Hash<String, Connector or Symbol>): Prepared connectors (or Symbol in case of failure with no_exception), per node name
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/hybrid_platforms_conductor/actions_executor.rb', line 193 def with_connections_prepared_to(nodes, no_exception: false) # Make sure every node needing connectors finds a connector nodes_needing_connectors = Hash[nodes.map { |node| [node, nil] }] @connector_plugins.each do |connector_name, connector| nodes_without_connectors = nodes_needing_connectors.select { |_node, selected_connector| selected_connector.nil? }.keys break if nodes_without_connectors.empty? (connector.connectable_nodes_from(nodes_without_connectors) & nodes_without_connectors).each do |node| nodes_needing_connectors[node] = connector if nodes_needing_connectors[node].nil? end end # If some nodes need connectors but can't find any, then fail nodes_without_connectors = nodes_needing_connectors.select { |_node, selected_connector| selected_connector.nil? }.keys unless nodes_without_connectors.empty? = "The following nodes have no possible connector to them: #{nodes_without_connectors.sort.join(', ')}" log_warn raise unless no_exception end # Prepare the connectors to operate on the nodes they have been assigned to preparation_code = proc do |remaining_plugins_to_prepare| connector_name = remaining_plugins_to_prepare.first if connector_name.nil? # All plugins have been prepared. # Call our client code. yield Hash[nodes_needing_connectors.map do |node, selected_connector| [ node, selected_connector.nil? ? :no_connector : selected_connector ] end] else connector = @connector_plugins[connector_name] selected_nodes = nodes_needing_connectors.select { |_node, selected_connector| selected_connector == connector }.keys if selected_nodes.empty? preparation_code.call(remaining_plugins_to_prepare[1..-1]) else connector.with_connection_to(selected_nodes, no_exception: no_exception) do |connected_nodes| (selected_nodes - connected_nodes).each do |node_in_error| nodes_needing_connectors[node_in_error] = :connection_error end preparation_code.call(remaining_plugins_to_prepare[1..-1]) end end end end preparation_code.call(@connector_plugins.select { |_connector_name, connector| connector.respond_to?(:with_connection_to) }.keys) end |