Class: MCollective::Util::TasksSupport
- Inherits:
-
Object
- Object
- MCollective::Util::TasksSupport
- Defined in:
- lib/mcollective/util/tasks_support.rb,
lib/mcollective/util/tasks_support/cli.rb,
lib/mcollective/util/tasks_support/json_formatter.rb,
lib/mcollective/util/tasks_support/default_formatter.rb
Defined Under Namespace
Classes: CLI
Instance Attribute Summary collapse
-
#cache_dir ⇒ Object
readonly
Returns the value of attribute cache_dir.
-
#choria ⇒ Object
readonly
Returns the value of attribute choria.
Instance Method Summary collapse
-
#aio? ⇒ Boolean
Is this an AIO install?.
-
#aio_bin_path ⇒ Object
AIO path to binaries like wrappers etc.
-
#aio_wrapper_path ⇒ String
Path to the AIO task wrapper executable.
-
#cache_task_file(file) ⇒ Object
Attempts to download and cache the file.
-
#cached?(files) ⇒ Boolean
Given a task description checks all files are correctly cached.
-
#cli(format, verbose) ⇒ CLI
Creates an instance of the CLI helpers.
-
#create_request_spooldir(requestid, task) ⇒ String
Generates the spool path and create it.
-
#create_task_stdout(stdout, completed, exitcode, wrapper_output) ⇒ Object
Parses the stdout and turns it into a JSON object.
-
#download_files(files) ⇒ Boolean
Downloads and caches a file set.
-
#file_sha256(file_path) ⇒ String
Calculates a hex digest SHA256 for a specific file.
-
#file_size(file_path) ⇒ Integer
Determines the file size of a specific file.
-
#http_get(path, headers = {}, &blk) ⇒ Net::HTTPRequest
Does a HTTP GET against the Puppet Server.
-
#initialize(choria, cache_dir = nil) ⇒ TasksSupport
constructor
A new instance of TasksSupport.
-
#parse_task(task) ⇒ Array<String>
Parse a task name like module::task into it’s 2 pieces.
-
#platform_specific_command(path) ⇒ Object
Expands the path into a platform specific version.
-
#populate_spooldir(spooldir, task) ⇒ Object
Copy task files to the spool directory.
-
#ps_shim_path ⇒ String
Path to the powershell shim for powershell input method.
-
#puppet_type_to_ruby(type) ⇒ Class, Boolean
Converts a Puppet type into something mcollective understands.
-
#request_spooldir(requestid) ⇒ String
Generate the path to the spool for a specific request.
-
#run_task_command(requestid, task, wait = true, callerid = "local") ⇒ Hash
Given a task spec runs it via the Puppet wrappers.
-
#spawn_command(command, environment, stdin, spooldir, run_as) ⇒ Integer
Runs the wrapper command detached from mcollective.
-
#task_command(spooldir, task) ⇒ String
Given a task spec figures out the command to run using the wrapper.
-
#task_complete?(requestid) ⇒ Boolean
Determines if a task is completed.
-
#task_environment(task, task_id, task_caller) ⇒ Hash
Given a task spec calculates the correct environment hash.
-
#task_failed?(status) ⇒ Boolean
Determines if a task failed based on its status.
-
#task_file?(file) ⇒ Boolean
Validates a task cache file.
-
#task_file_name(file) ⇒ String
Determines the cache path for a task file.
-
#task_input(task) ⇒ Hash?
Given a task spec, creates the standard input.
-
#task_input_method(task) ⇒ "powershell", ...
Given a task spec figures out the input method.
-
#task_metadata(task, environment) ⇒ Hash
Requests a task metadata from Puppet Server.
-
#task_module(task) ⇒ String
Return a task’s module.
-
#task_names(environment) ⇒ Array<String>
Retrieves the list of known task names.
-
#task_ran?(requestid) ⇒ Boolean
Determines if a task already ran by checkinf if its spool exist.
-
#task_runtime(requestid) ⇒ Float
Determines how long a task ran for.
-
#task_status(requestid) ⇒ Hash
Determines the task status for given request.
-
#tasks(environment) ⇒ Hash
Retrieves the list of known tasks in an environment.
-
#tasks_compatible? ⇒ Boolean
Determines if a machine is compatible with running bolt.
-
#validate_task_inputs(inputs, task) ⇒ Array[Boolean, Error]
Validates that the inputs would be acceptable to the task.
-
#wait_for_task_completion(requestid) ⇒ Object
Waits for a task to complete.
-
#wrapper_path ⇒ String
Path to the task wrapper executable.
Constructor Details
#initialize(choria, cache_dir = nil) ⇒ TasksSupport
Returns a new instance of TasksSupport.
10 11 12 13 |
# File 'lib/mcollective/util/tasks_support.rb', line 10 def initialize(choria, cache_dir=nil) @choria = choria @cache_dir = cache_dir || @choria.get_option("choria.tasks_cache") end |
Instance Attribute Details
#cache_dir ⇒ Object (readonly)
Returns the value of attribute cache_dir.
8 9 10 |
# File 'lib/mcollective/util/tasks_support.rb', line 8 def cache_dir @cache_dir end |
#choria ⇒ Object (readonly)
Returns the value of attribute choria.
8 9 10 |
# File 'lib/mcollective/util/tasks_support.rb', line 8 def choria @choria end |
Instance Method Details
#aio? ⇒ Boolean
Is this an AIO install?
94 95 96 |
# File 'lib/mcollective/util/tasks_support.rb', line 94 def aio? File.directory?(aio_bin_path) end |
#aio_bin_path ⇒ Object
AIO path to binaries like wrappers etc
66 67 68 69 70 71 72 |
# File 'lib/mcollective/util/tasks_support.rb', line 66 def aio_bin_path if Util.windows? 'C:\Program Files\Puppet Labs\Puppet\puppet\bin' else "/opt/puppetlabs/puppet/bin" end end |
#aio_wrapper_path ⇒ String
Path to the AIO task wrapper executable
77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/mcollective/util/tasks_support.rb', line 77 def aio_wrapper_path if Util.windows? legacy = File.join(aio_bin_path, "task_wrapper.exe") return legacy if File.exist?(legacy) File.join(aio_bin_path, "execution_wrapper.exe") else legacy = File.join(aio_bin_path, "task_wrapper") return legacy if File.exist?(legacy) File.join(aio_bin_path, "execution_wrapper") end end |
#cache_task_file(file) ⇒ Object
Does not first check if the cache is ok, unconditionally downloads
Attempts to download and cache the file
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 |
# File 'lib/mcollective/util/tasks_support.rb', line 711 def cache_task_file(file) path = [file["uri"]["path"], URI.encode_www_form(file["uri"]["params"])].join("?") file_name = task_file_name(file) Log.debug("Caching task to %s" % file_name) http_get(path, "Accept" => "application/octet-stream") do |resp| raise("Failed to request task content %s: %s: %s" % [path, resp.code, resp.body]) unless resp.code == "200" FileUtils.mkdir_p(cache_dir, mode: 0o0750) # rubocop:disable Style/HashSyntax FileUtils.rm_rf(file_name) if File.directory?(file_name) task_file = Tempfile.new("tasks_%s" % file["filename"]) task_file.binmode resp.read_body do |segment| task_file.write(segment) end task_file.close FileUtils.chmod(0o0750, task_file.path) FileUtils.mv(task_file.path, file_name) end end |
#cached?(files) ⇒ Boolean
this checks all files, though for now there’s only ever one file
Given a task description checks all files are correctly cached
141 142 143 |
# File 'lib/mcollective/util/tasks_support.rb', line 141 def cached?(files) files.map {|f| task_file?(f)}.all? end |
#cli(format, verbose) ⇒ CLI
Creates an instance of the CLI helpers
19 20 21 22 |
# File 'lib/mcollective/util/tasks_support.rb', line 19 def cli(format, verbose) require_relative "tasks_support/cli" CLI.new(self, format, verbose) end |
#create_request_spooldir(requestid, task) ⇒ String
Generates the spool path and create it
218 219 220 221 222 223 224 225 226 |
# File 'lib/mcollective/util/tasks_support.rb', line 218 def create_request_spooldir(requestid, task) dir = request_spooldir(requestid) FileUtils.mkdir_p(dir, mode: 0o0750) # rubocop:disable Style/HashSyntax populate_spooldir(dir, task) dir end |
#create_task_stdout(stdout, completed, exitcode, wrapper_output) ⇒ Object
Parses the stdout and turns it into a JSON object
If the output is JSON parsable the output is returned else it’s wrapped in _output as per the Tasks spec version 1
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 |
# File 'lib/mcollective/util/tasks_support.rb', line 431 def create_task_stdout(stdout, completed, exitcode, wrapper_output) result = {} unless wrapper_output.empty? result["_error"] = { "kind" => "choria.tasks/wrapper-error", "msg" => "The task wrapper failed to run", "details" => { "wrapper_output" => wrapper_output } } return result.to_json end begin data = JSON.parse(stdout) if data.is_a?(Hash) result = data else result["_output"] = stdout end rescue result["_output"] = stdout end if exitcode != 0 && completed && !result["_error"] result["_error"] = { "kind" => "choria.tasks/task-error", "msg" => "The task errored with a code %d" % exitcode, "details" => { "exitcode" => exitcode } } end result end |
#download_files(files) ⇒ Boolean
Downloads and caches a file set
743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 |
# File 'lib/mcollective/util/tasks_support.rb', line 743 def download_files(files) Log.info("Downloading %d task files" % files.size) files.each do |file| next if task_file?(file) try = 0 begin return false if try == 2 try += 1 Log.debug("Downloading task file %s (try %s/2)" % [file["filename"], try]) cache_task_file(file) rescue Log.error(msg = "Could not download task file: %s: %s" % [$!.class, $!.to_s]) sleep 0.1 retry if try < 2 raise(msg) end end true end |
#file_sha256(file_path) ⇒ String
Calculates a hex digest SHA256 for a specific file
675 676 677 |
# File 'lib/mcollective/util/tasks_support.rb', line 675 def file_sha256(file_path) Digest::SHA256.file(file_path).hexdigest end |
#file_size(file_path) ⇒ Integer
Determines the file size of a specific file
683 684 685 686 687 |
# File 'lib/mcollective/util/tasks_support.rb', line 683 def file_size(file_path) File.stat(file_path).size rescue -1 end |
#http_get(path, headers = {}, &blk) ⇒ Net::HTTPRequest
Does a HTTP GET against the Puppet Server
607 608 609 610 611 612 613 614 615 616 |
# File 'lib/mcollective/util/tasks_support.rb', line 607 def http_get(path, headers={}, &blk) transport = choria.https(choria.puppet_server, true) request = choria.http_get(path) headers.each do |k, v| request[k] = v end transport.request(request, &blk) end |
#parse_task(task) ⇒ Array<String>
Parse a task name like module::task into it’s 2 pieces
577 578 579 580 581 582 583 |
# File 'lib/mcollective/util/tasks_support.rb', line 577 def parse_task(task) parts = task.split("::") parts << "init" if parts.size == 1 parts end |
#platform_specific_command(path) ⇒ Object
Expands the path into a platform specific version
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/mcollective/util/tasks_support.rb', line 117 def platform_specific_command(path) return [path] unless Util.windows? extension = File.extname(path) # https://github.com/puppetlabs/pxp-agent/blob/3e7cada3cedf7f78703781d44e70010d0c5ad209/lib/src/modules/task.cc#L98-L107 case extension when ".rb" ["ruby", path] when ".pp" ["puppet", "apply", path] when ".ps1" ["powershell", "-NoProfile", "-NonInteractive", "-NoLogo", "-ExecutionPolicy", "Bypass", "-File", path] else [path] end end |
#populate_spooldir(spooldir, task) ⇒ Object
Copy task files to the spool directory
231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/mcollective/util/tasks_support.rb', line 231 def populate_spooldir(spooldir, task) task["files"].each do |file| filename = file["filename"] filename = File.join(task_module(task["task"]), "tasks", filename) unless filename.index("/") spool_filename = File.join(spooldir, "files", filename) FileUtils.mkdir_p(File.dirname(spool_filename), mode: 0o0750) # rubocop:disable Style/HashSyntax FileUtils.cp(task_file_name(file), spool_filename) end end |
#ps_shim_path ⇒ String
Path to the powershell shim for powershell input method
108 109 110 |
# File 'lib/mcollective/util/tasks_support.rb', line 108 def ps_shim_path File.join(aio_bin_path, "PowershellShim.ps1") end |
#puppet_type_to_ruby(type) ⇒ Class, Boolean
Converts a Puppet type into something mcollective understands
This is inevitably hacky by its nature, there is no way for me to parse the types. PAL might get some helpers for this but till then this is going to have to be best efforts.
When there is a too complex situation users can always put in –input and some JSON to work around it until something better comes around
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/mcollective/util/tasks_support.rb', line 35 def puppet_type_to_ruby(type) array = false required = true if type =~ /Optional\[(.+)/ type = $1 required = false end if type =~ /Array\[(.+)/ type = $1 array = true end return [Numeric, array, required] if type =~ /Integer/ return [Numeric, array, required] if type =~ /Float/ return [Hash, array, required] if type =~ /Hash/ return [:boolean, array, required] if type =~ /Boolean/ [String, array, required] end |
#request_spooldir(requestid) ⇒ String
Generate the path to the spool for a specific request
208 209 210 |
# File 'lib/mcollective/util/tasks_support.rb', line 208 def request_spooldir(requestid) File.join(choria.tasks_spool_dir, requestid) end |
#run_task_command(requestid, task, wait = true, callerid = "local") ⇒ Hash
before this should be run be sure to download the tasks first
Given a task spec runs it via the Puppet wrappers
The task is run in the background and this method waits for it to finish, but should the thread this method runs in be killed the process will continue and one can later check again using the request id
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 |
# File 'lib/mcollective/util/tasks_support.rb', line 358 def run_task_command(requestid, task, wait=true, callerid="local") raise("The task wrapper %s does not exist, please upgrade Puppet" % wrapper_path) unless File.exist?(wrapper_path) raise("Task %s is not available or does not match the specification, please download it" % task["task"]) unless cached?(task["files"]) raise("Task spool for request %s already exist, cannot rerun", requestid) if task_ran?(requestid) spool = create_request_spooldir(requestid, task) command = task_command(spool, task) Log.debug("Trying to spawn task %s in spool %s using command %s" % [task["task"], spool, command]) wrapper_input = { "executable" => command[0], "arguments" => command[1..-1], "input" => task_input(task), "stdout" => File.join(spool, "stdout"), "stderr" => File.join(spool, "stderr"), "exitcode" => File.join(spool, "exitcode") } File.open(File.join(spool, "choria.json"), "w") do || data = { "start_time" => Time.now.utc.to_i, "caller" => callerid, "task" => task["task"], "request" => wrapper_input } .print(data.to_json) end pid = spawn_command(wrapper_path, task_environment(task, requestid, callerid), wrapper_input.to_json, spool, task["run_as"]) Log.info("Spawned task %s in spool %s with pid %s" % [task["task"], spool, pid]) wait_for_task_completion(requestid) if wait task_status(requestid) end |
#spawn_command(command, environment, stdin, spooldir, run_as) ⇒ Integer
Runs the wrapper command detached from mcollective
We always detach we have no idea how long these tasks will run since people can do whatever they like, we’ll then check them till the agent timeout but if timeout happens they keep running
The idea is that UI will in that case present the user with a request id - which is also the spool name - and the user can later come and act on these tasks either by asking for their status or perhaps killing them?
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 |
# File 'lib/mcollective/util/tasks_support.rb', line 268 def spawn_command(command, environment, stdin, spooldir, run_as) wrapper_input = File.join(spooldir, "wrapper_stdin") wrapper_stdout = File.join(spooldir, "wrapper_stdout") wrapper_stderr = File.join(spooldir, "wrapper_stderr") wrapper_pid = File.join(spooldir, "wrapper_pid") = { :chdir => "/", :in => :close, :out => wrapper_stdout, :err => wrapper_stderr } if stdin File.open(wrapper_input, "w") {|i| i.print(stdin) } [:in] = wrapper_input end if run_as raise("System does not allow forking. run_as not usable.") unless Process.respond_to?(:fork) require "etc" u = Etc.getpwnam(run_as) FileUtils.chown_R(u.uid, u.gid, spooldir) pid = Process.fork if pid.nil? Process.gid = Process.egid = u.gid Process.uid = Process.euid = u.uid ENV.delete_if { |name| name !~ /^LC_/ } Process.exec(environment, command, ) end else pid = Process.spawn(environment, [command, command], ) end sleep 0.1 until File.exist?(wrapper_stdout) File.open(wrapper_pid, "w") {|p| p.write(pid)} Process.detach(pid) pid end |
#task_command(spooldir, task) ⇒ String
Given a task spec figures out the command to run using the wrapper
165 166 167 168 169 170 171 172 173 174 |
# File 'lib/mcollective/util/tasks_support.rb', line 165 def task_command(spooldir, task) file_spec = task["files"][0] file_name = File.join(spooldir, "files", task_module(task["task"]), "tasks", file_spec["filename"]) command = platform_specific_command(file_name) command.unshift(ps_shim_path) if task_input_method(task) == "powershell" command end |
#task_complete?(requestid) ⇒ Boolean
Determines if a task is completed
Tasks are run under the wrapper which will write the existcode to a file only after the command have exited, so this will wait for that to appear
331 332 333 334 335 336 |
# File 'lib/mcollective/util/tasks_support.rb', line 331 def task_complete?(requestid) exitcode = File.join(request_spooldir(requestid), "exitcode") wrapper_stderr = File.join(request_spooldir(requestid), "wrapper_stderr") File.exist?(wrapper_stderr) && file_size(wrapper_stderr) > 0 || File.exist?(exitcode) && file_size(exitcode) > 0 end |
#task_environment(task, task_id, task_caller) ⇒ Hash
Given a task spec calculates the correct environment hash
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/mcollective/util/tasks_support.rb', line 182 def task_environment(task, task_id, task_caller) environment = { "_task" => task["task"], "_choria_task_id" => task_id, "_choria_task_caller" => task_caller } environment["PATH"] = "#{aio_bin_path}#{File::PATH_SEPARATOR}#{ENV['PATH']}" if aio? return environment unless task["input"] return environment unless ["both", "environment"].include?(task_input_method(task)) JSON.parse(task["input"]).each do |k, v| environment["PT_%s" % k] = v.to_s end environment["PT__task"] = task["task"] environment["PT__installdir"] = File.join(request_spooldir(task_id), "files") environment end |
#task_failed?(status) ⇒ Boolean
Determines if a task failed based on its status
475 476 477 478 479 480 481 482 483 |
# File 'lib/mcollective/util/tasks_support.rb', line 475 def task_failed?(status) return true unless status["wrapper_spawned"] return true unless status["wrapper_pid"] return true unless status["wrapper_error"].empty? return true if status["exitcode"] != 0 && status["completed"] return true if status["stdout"].include?("_error") false end |
#task_file?(file) ⇒ Boolean
Validates a task cache file
693 694 695 696 697 698 699 700 701 702 703 |
# File 'lib/mcollective/util/tasks_support.rb', line 693 def task_file?(file) file_name = task_file_name(file) Log.debug("Checking if file %s is cached using %s" % [file_name, file.pretty_inspect]) return false unless File.exist?(file_name) return false unless file_size(file_name) == file["size_bytes"] return false unless file_sha256(file_name) == file["sha256"] true end |
#task_file_name(file) ⇒ String
Determines the cache path for a task file
598 599 600 |
# File 'lib/mcollective/util/tasks_support.rb', line 598 def task_file_name(file) File.join(cache_dir, file["sha256"]) end |
#task_input(task) ⇒ Hash?
Given a task spec, creates the standard input
247 248 249 |
# File 'lib/mcollective/util/tasks_support.rb', line 247 def task_input(task) task["input"] if ["both", "powershell", "stdin"].include?(task_input_method(task)) end |
#task_input_method(task) ⇒ "powershell", ...
Given a task spec figures out the input method
149 150 151 152 153 154 155 156 157 158 |
# File 'lib/mcollective/util/tasks_support.rb', line 149 def task_input_method(task) # the spec says only 1 executable, no idea what the point of the 'files' is file_extension = File.extname(task["files"][0]["filename"]) input_method = task["input_method"] input_method = "powershell" if input_method.nil? && file_extension == ".ps1" input_method ||= "both" input_method end |
#task_metadata(task, environment) ⇒ Hash
Requests a task metadata from Puppet Server
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 |
# File 'lib/mcollective/util/tasks_support.rb', line 624 def (task, environment) parsed = parse_task(task) path = "/puppet/v3/tasks/%s/%s?environment=%s" % [parsed[0], parsed[1], environment] resp = http_get(path) raise("Failed to request task metadata: %s: %s" % [resp.code, resp.body]) unless resp.code == "200" result = JSON.parse(resp.body) result["metadata"] ||= {} result["metadata"]["parameters"] ||= {} result["files"] ||= [] result end |
#task_module(task) ⇒ String
Return a task’s module
590 591 592 |
# File 'lib/mcollective/util/tasks_support.rb', line 590 def task_module(task) parse_task(task)[0] end |
#task_names(environment) ⇒ Array<String>
Retrieves the list of known task names
568 569 570 |
# File 'lib/mcollective/util/tasks_support.rb', line 568 def task_names(environment) tasks(environment).map {|t| t["name"]} end |
#task_ran?(requestid) ⇒ Boolean
Determines if a task already ran by checkinf if its spool exist
319 320 321 |
# File 'lib/mcollective/util/tasks_support.rb', line 319 def task_ran?(requestid) File.directory?(request_spooldir(requestid)) end |
#task_runtime(requestid) ⇒ Float
Determines how long a task ran for
Tasks that had wrapper failures will have a 0 run time, still running tasks will calculate runtime till now and so increase on each invocation
405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/mcollective/util/tasks_support.rb', line 405 def task_runtime(requestid) spool = request_spooldir(requestid) wrapper_stderr = File.join(spool, "wrapper_stderr") wrapper_pid = File.join(spool, "wrapper_pid") exitcode = File.join(spool, "exitcode") if task_complete?(requestid) && File.exist?(exitcode) Float(File::Stat.new(exitcode).mtime - File::Stat.new(wrapper_pid).mtime) elsif task_complete?(requestid) && file_size(wrapper_stderr) > 0 0.0 else Float(Time.now - File::Stat.new(wrapper_pid).mtime) end end |
#task_status(requestid) ⇒ Hash
Determines the task status for given request
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 |
# File 'lib/mcollective/util/tasks_support.rb', line 489 def task_status(requestid) raise("Task %s have not been requested" % requestid) unless task_ran?(requestid) spool = request_spooldir(requestid) stdout = File.join(spool, "stdout") stderr = File.join(spool, "stderr") exitcode = File.join(spool, "exitcode") wrapper_stderr = File.join(spool, "wrapper_stderr") wrapper_pid = File.join(spool, "wrapper_pid") = File.join(spool, "choria.json") result = { "spool" => spool, "task" => nil, "caller" => nil, "stdout" => "", "stderr" => "", "exitcode" => -1, "runtime" => task_runtime(requestid), "start_time" => Time.at(0).utc, "wrapper_spawned" => false, "wrapper_error" => "", "wrapper_pid" => nil, "completed" => task_complete?(requestid) } result["exitcode"] = Integer(File.read(exitcode)) if File.exist?(exitcode) if task_ran?(requestid) result["stdout"] = File.read(stdout) if File.exist?(stdout) result["stderr"] = File.read(stderr) if File.exist?(stderr) result["wrapper_spawned"] = File.exist?(wrapper_stderr) && file_size(wrapper_stderr) == 0 result["wrapper_error"] = File.read(wrapper_stderr) if File.exist?(wrapper_stderr) && file_size(wrapper_stderr) > 0 if File.exist?(wrapper_pid) && file_size(wrapper_pid) > 0 result["start_time"] = File::Stat.new(wrapper_pid).mtime.utc result["wrapper_pid"] = Integer(File.read(wrapper_pid)) end end if File.exist?() = JSON.parse(File.read()) result["start_time"] = Time.at(["start_time"]).utc result["caller"] = ["caller"] result["task"] = ["task"] end result["stdout"] = create_task_stdout( result["stdout"], result["completed"], result["exitcode"], result["wrapper_error"] ) result end |
#tasks(environment) ⇒ Hash
Retrieves the list of known tasks in an environment
553 554 555 556 557 558 559 560 561 |
# File 'lib/mcollective/util/tasks_support.rb', line 553 def tasks(environment) resp = http_get("/puppet/v3/tasks?environment=%s" % [environment]) raise("Failed to retrieve task list: %s: %s" % [$!.class, $!.to_s]) unless resp.code == "200" tasks = JSON.parse(resp.body) tasks.sort_by {|t| t["name"]} end |
#tasks_compatible? ⇒ Boolean
this should check for a compatible version of Puppet more
Determines if a machine is compatible with running bolt
61 62 63 |
# File 'lib/mcollective/util/tasks_support.rb', line 61 def tasks_compatible? File.exist?(wrapper_path) && File.executable?(wrapper_path) end |
#validate_task_inputs(inputs, task) ⇒ Array[Boolean, Error]
Copied from PAL TaskSignature#runnable_with?
Validates that the inputs would be acceptable to the task
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 |
# File 'lib/mcollective/util/tasks_support.rb', line 647 def validate_task_inputs(inputs, task) return [true, ""] unless task["metadata"]["parameters"] return [true, ""] if task["metadata"]["parameters"].empty? && inputs.empty? require "puppet" signature = {} task["metadata"]["parameters"].each do |k, v| signature[k] = Puppet::Pops::Types::TypeParser.singleton.parse(v["type"]) end signature_type = Puppet::Pops::Types::TypeFactory.struct(signature) return [true, ""] if signature_type.instance?(inputs) tm = Puppet::Pops::Types::TypeMismatchDescriber.singleton reason = tm.describe_struct_signature(signature_type, inputs).flatten.map(&:format).join("\n") reason = "\nInvalid input: \n\t%s" % [reason] [false, reason] end |
#wait_for_task_completion(requestid) ⇒ Object
Waits for a task to complete
341 342 343 |
# File 'lib/mcollective/util/tasks_support.rb', line 341 def wait_for_task_completion(requestid) sleep 0.1 until task_complete?(requestid) end |
#wrapper_path ⇒ String
Path to the task wrapper executable
101 102 103 |
# File 'lib/mcollective/util/tasks_support.rb', line 101 def wrapper_path @choria.get_option("choria.tasks.wrapper_path", aio_wrapper_path) end |