Class: MCollective::Util::TasksSupport

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/util/tasks_support.rb,
lib/mcollective/util/tasks_support/cli.rb,
lib/mcollective/util/tasks_support/json_formatter.rb,
lib/mcollective/util/tasks_support/default_formatter.rb

Defined Under Namespace

Classes: CLI

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(choria, cache_dir = nil) ⇒ TasksSupport

Returns a new instance of TasksSupport.



10
11
12
13
# File 'lib/mcollective/util/tasks_support.rb', line 10

def initialize(choria, cache_dir=nil)
  @choria = choria
  @cache_dir = cache_dir || @choria.get_option("choria.tasks_cache")
end

Instance Attribute Details

#cache_dirObject (readonly)

Returns the value of attribute cache_dir.



8
9
10
# File 'lib/mcollective/util/tasks_support.rb', line 8

def cache_dir
  @cache_dir
end

#choriaObject (readonly)

Returns the value of attribute choria.



8
9
10
# File 'lib/mcollective/util/tasks_support.rb', line 8

def choria
  @choria
end

Instance Method Details

#aio?Boolean

Is this an AIO install?

Returns:

  • (Boolean)


94
95
96
# File 'lib/mcollective/util/tasks_support.rb', line 94

def aio?
  File.directory?(aio_bin_path)
end

#aio_bin_pathObject

AIO path to binaries like wrappers etc



66
67
68
69
70
71
72
# File 'lib/mcollective/util/tasks_support.rb', line 66

def aio_bin_path
  if Util.windows?
    'C:\Program Files\Puppet Labs\Puppet\puppet\bin'
  else
    "/opt/puppetlabs/puppet/bin"
  end
end

#aio_wrapper_pathString

Path to the AIO task wrapper executable

Returns:



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/mcollective/util/tasks_support.rb', line 77

def aio_wrapper_path
  if Util.windows?
    legacy = File.join(aio_bin_path, "task_wrapper.exe")
    return legacy if File.exist?(legacy)

    File.join(aio_bin_path, "execution_wrapper.exe")
  else
    legacy = File.join(aio_bin_path, "task_wrapper")
    return legacy if File.exist?(legacy)

    File.join(aio_bin_path, "execution_wrapper")
  end
end

#cache_task_file(file) ⇒ Object

Note:

Does not first check if the cache is ok, unconditionally downloads

Attempts to download and cache the file

Parameters:

  • file (Hash)

    a file hash as per the task metadata

Raises:

  • (StandardError)

    when downloading fails

See Also:



711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
# File 'lib/mcollective/util/tasks_support.rb', line 711

def cache_task_file(file)
  path = [file["uri"]["path"], URI.encode_www_form(file["uri"]["params"])].join("?")

  file_name = task_file_name(file)

  Log.debug("Caching task to %s" % file_name)

  http_get(path, "Accept" => "application/octet-stream") do |resp|
    raise("Failed to request task content %s: %s: %s" % [path, resp.code, resp.body]) unless resp.code == "200"

    FileUtils.mkdir_p(cache_dir, mode: 0o0750) # rubocop:disable Style/HashSyntax
    FileUtils.rm_rf(file_name) if File.directory?(file_name)

    task_file = Tempfile.new("tasks_%s" % file["filename"])
    task_file.binmode

    resp.read_body do |segment|
      task_file.write(segment)
    end

    task_file.close

    FileUtils.chmod(0o0750, task_file.path)
    FileUtils.mv(task_file.path, file_name)
  end
end

#cached?(files) ⇒ Boolean

Note:

this checks all files, though for now there’s only ever one file

Given a task description checks all files are correctly cached

Parameters:

  • files (Array)

    files list

Returns:

  • (Boolean)

See Also:



141
142
143
# File 'lib/mcollective/util/tasks_support.rb', line 141

def cached?(files)
  files.map {|f| task_file?(f)}.all?
end

#cli(format, verbose) ⇒ CLI

Creates an instance of the CLI helpers

Parameters:

  • format (:json, :default)

    the output format to use

Returns:



19
20
21
22
# File 'lib/mcollective/util/tasks_support.rb', line 19

def cli(format, verbose)
  require_relative "tasks_support/cli"
  CLI.new(self, format, verbose)
end

#create_request_spooldir(requestid, task) ⇒ String

Generates the spool path and create it

Parameters:

  • requestid (String)

    unique mco request id

  • task (Hash)

    task specification

Returns:

  • (String)

    path to the spool dir

Raises:

  • (StandardError)

    should it not be able to make the directory



218
219
220
221
222
223
224
225
226
# File 'lib/mcollective/util/tasks_support.rb', line 218

def create_request_spooldir(requestid, task)
  dir = request_spooldir(requestid)

  FileUtils.mkdir_p(dir, mode: 0o0750) # rubocop:disable Style/HashSyntax

  populate_spooldir(dir, task)

  dir
end

#create_task_stdout(stdout, completed, exitcode, wrapper_output) ⇒ Object

Parses the stdout and turns it into a JSON object

If the output is JSON parsable the output is returned else it’s wrapped in _output as per the Tasks spec version 1

Parameters:

  • stdout (String)

    the stdout from the script

  • completed (Boolean)

    if the task is done running

  • exitcode (Integer)

    the exitcode from the script

  • wrapper_output (String)

    the wrapper output

Returns:

  • (Object)

    the new stdout according to spec and the stdout object, not JSON encoded



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
# File 'lib/mcollective/util/tasks_support.rb', line 431

def create_task_stdout(stdout, completed, exitcode, wrapper_output)
  result = {}

  unless wrapper_output.empty?
    result["_error"] = {
      "kind" => "choria.tasks/wrapper-error",
      "msg" => "The task wrapper failed to run",
      "details" => {
        "wrapper_output" => wrapper_output
      }
    }

    return result.to_json
  end

  begin
    data = JSON.parse(stdout)

    if data.is_a?(Hash)
      result = data
    else
      result["_output"] = stdout
    end
  rescue
    result["_output"] = stdout
  end

  if exitcode != 0 && completed && !result["_error"]
    result["_error"] = {
      "kind" => "choria.tasks/task-error",
      "msg" => "The task errored with a code %d" % exitcode,
      "details" => {
        "exitcode" => exitcode
      }
    }
  end

  result
end

#download_files(files) ⇒ Boolean

Downloads and caches a file set

Parameters:

  • files (Array)

    the files description

Returns:

  • (Boolean)

    indicating download success

Raises:

  • (StandardError)

    on download failures



743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
# File 'lib/mcollective/util/tasks_support.rb', line 743

def download_files(files)
  Log.info("Downloading %d task files" % files.size)

  files.each do |file|
    next if task_file?(file)

    try = 0

    begin
      return false if try == 2

      try += 1

      Log.debug("Downloading task file %s (try %s/2)" % [file["filename"], try])

      cache_task_file(file)
    rescue
      Log.error(msg = "Could not download task file: %s: %s" % [$!.class, $!.to_s])

      sleep 0.1

      retry if try < 2

      raise(msg)
    end
  end

  true
end

#file_sha256(file_path) ⇒ String

Calculates a hex digest SHA256 for a specific file

Parameters:

  • file_path (String)

    a full path to the file to check

Returns:

Raises:

  • (StandardError)

    when the file does not exist



675
676
677
# File 'lib/mcollective/util/tasks_support.rb', line 675

def file_sha256(file_path)
  Digest::SHA256.file(file_path).hexdigest
end

#file_size(file_path) ⇒ Integer

Determines the file size of a specific file

Parameters:

  • file_path (String)

    a full path to the file to check

Returns:

  • (Integer)

    bytes, -1 when the file does not exist



683
684
685
686
687
# File 'lib/mcollective/util/tasks_support.rb', line 683

def file_size(file_path)
  File.stat(file_path).size
rescue
  -1
end

#http_get(path, headers = {}, &blk) ⇒ Net::HTTPRequest

Does a HTTP GET against the Puppet Server

Parameters:

  • path (String)

    the path to get

  • headers (Hash) (defaults to: {})

    headers to passs

Returns:

  • (Net::HTTPRequest)


607
608
609
610
611
612
613
614
615
616
# File 'lib/mcollective/util/tasks_support.rb', line 607

def http_get(path, headers={}, &blk)
  transport = choria.https(choria.puppet_server, true)
  request = choria.http_get(path)

  headers.each do |k, v|
    request[k] = v
  end

  transport.request(request, &blk)
end

#parse_task(task) ⇒ Array<String>

Parse a task name like module::task into it’s 2 pieces

Parameters:

Returns:

  • (Array<String>)

    2 part array, first the module name then the task name

Raises:

  • (StandardError)

    for invalid task names



577
578
579
580
581
582
583
# File 'lib/mcollective/util/tasks_support.rb', line 577

def parse_task(task)
  parts = task.split("::")

  parts << "init" if parts.size == 1

  parts
end

#platform_specific_command(path) ⇒ Object

Expands the path into a platform specific version

Parameters:

  • path (Array<String>)

    the path to the executable and any arguments

Raises:

  • (StandardError)

    when execution of a specific file is not supported

See Also:



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/mcollective/util/tasks_support.rb', line 117

def platform_specific_command(path)
  return [path] unless Util.windows?

  extension = File.extname(path)

  # https://github.com/puppetlabs/pxp-agent/blob/3e7cada3cedf7f78703781d44e70010d0c5ad209/lib/src/modules/task.cc#L98-L107
  case extension
  when ".rb"
    ["ruby", path]
  when ".pp"
    ["puppet", "apply", path]
  when ".ps1"
    ["powershell", "-NoProfile", "-NonInteractive", "-NoLogo", "-ExecutionPolicy", "Bypass", "-File", path]
  else
    [path]
  end
end

#populate_spooldir(spooldir, task) ⇒ Object

Copy task files to the spool directory

Parameters:

  • spooldir (String)

    path to the spool dir

  • task (Hash)

    task specification



231
232
233
234
235
236
237
238
239
240
241
# File 'lib/mcollective/util/tasks_support.rb', line 231

def populate_spooldir(spooldir, task)
  task["files"].each do |file|
    filename = file["filename"]
    filename = File.join(task_module(task["task"]), "tasks", filename) unless filename.index("/")

    spool_filename = File.join(spooldir, "files", filename)

    FileUtils.mkdir_p(File.dirname(spool_filename), mode: 0o0750) # rubocop:disable Style/HashSyntax
    FileUtils.cp(task_file_name(file), spool_filename)
  end
end

#ps_shim_pathString

Path to the powershell shim for powershell input method

Returns:



108
109
110
# File 'lib/mcollective/util/tasks_support.rb', line 108

def ps_shim_path
  File.join(aio_bin_path, "PowershellShim.ps1")
end

#puppet_type_to_ruby(type) ⇒ Class, Boolean

Converts a Puppet type into something mcollective understands

This is inevitably hacky by its nature, there is no way for me to parse the types. PAL might get some helpers for this but till then this is going to have to be best efforts.

When there is a too complex situation users can always put in –input and some JSON to work around it until something better comes around

Parameters:

  • type (String)

    a puppet type

Returns:

  • (Class, Boolean, Boolean)

    The data type, if its an array input or not and if its required



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/mcollective/util/tasks_support.rb', line 35

def puppet_type_to_ruby(type)
  array = false
  required = true

  if type =~ /Optional\[(.+)/
    type = $1
    required = false
  end

  if type =~ /Array\[(.+)/
    type = $1
    array = true
  end

  return [Numeric, array, required] if type =~ /Integer/
  return [Numeric, array, required] if type =~ /Float/
  return [Hash, array, required] if type =~ /Hash/
  return [:boolean, array, required] if type =~ /Boolean/

  [String, array, required]
end

#request_spooldir(requestid) ⇒ String

Generate the path to the spool for a specific request

Parameters:

  • requestid (String)

    task id

Returns:



208
209
210
# File 'lib/mcollective/util/tasks_support.rb', line 208

def request_spooldir(requestid)
  File.join(choria.tasks_spool_dir, requestid)
end

#run_task_command(requestid, task, wait = true, callerid = "local") ⇒ Hash

Note:

before this should be run be sure to download the tasks first

Given a task spec runs it via the Puppet wrappers

The task is run in the background and this method waits for it to finish, but should the thread this method runs in be killed the process will continue and one can later check again using the request id

Parameters:

  • requestid (String)

    the task requestid

  • task (Hash)

    task specification

  • wait (Boolean) (defaults to: true)

    should the we wait for the task to complete

  • callerid (String) (defaults to: "local")

    the mcollective callerid who is running the task

Returns:

  • (Hash)

    the task result as per #task_result

Raises:

  • (StandardError)

    when calling the wrapper fails etc



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/mcollective/util/tasks_support.rb', line 358

def run_task_command(requestid, task, wait=true, callerid="local")
  raise("The task wrapper %s does not exist, please upgrade Puppet" % wrapper_path) unless File.exist?(wrapper_path)
  raise("Task %s is not available or does not match the specification, please download it" % task["task"]) unless cached?(task["files"])
  raise("Task spool for request %s already exist, cannot rerun", requestid) if task_ran?(requestid)

  spool = create_request_spooldir(requestid, task)
  command = task_command(spool, task)

  Log.debug("Trying to spawn task %s in spool %s using command %s" % [task["task"], spool, command])

  wrapper_input = {
    "executable" => command[0],
    "arguments" => command[1..-1],
    "input" => task_input(task),
    "stdout" => File.join(spool, "stdout"),
    "stderr" => File.join(spool, "stderr"),
    "exitcode" => File.join(spool, "exitcode")
  }

  File.open(File.join(spool, "choria.json"), "w") do |meta|
    data = {
      "start_time" => Time.now.utc.to_i,
      "caller" => callerid,
      "task" => task["task"],
      "request" => wrapper_input
    }

    meta.print(data.to_json)
  end

  pid = spawn_command(wrapper_path, task_environment(task, requestid, callerid), wrapper_input.to_json, spool, task["run_as"])

  Log.info("Spawned task %s in spool %s with pid %s" % [task["task"], spool, pid])

  wait_for_task_completion(requestid) if wait

  task_status(requestid)
end

#spawn_command(command, environment, stdin, spooldir, run_as) ⇒ Integer

Runs the wrapper command detached from mcollective

We always detach we have no idea how long these tasks will run since people can do whatever they like, we’ll then check them till the agent timeout but if timeout happens they keep running

The idea is that UI will in that case present the user with a request id - which is also the spool name - and the user can later come and act on these tasks either by asking for their status or perhaps killing them?

Parameters:

  • command (String)

    command to run

  • environment (Hash)

    environment to run with

  • stdin (String)

    stdin to send to the command

  • spooldir (String)

    path to the spool for this specific request

  • run_as (String)

    name of the user who will run the command

Returns:

  • (Integer)

    the pid that was spawned



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/mcollective/util/tasks_support.rb', line 268

def spawn_command(command, environment, stdin, spooldir, run_as)
  wrapper_input = File.join(spooldir, "wrapper_stdin")
  wrapper_stdout = File.join(spooldir, "wrapper_stdout")
  wrapper_stderr = File.join(spooldir, "wrapper_stderr")
  wrapper_pid = File.join(spooldir, "wrapper_pid")

  options = {
    :chdir => "/",
    :in => :close,
    :out => wrapper_stdout,
    :err => wrapper_stderr
  }

  if stdin
    File.open(wrapper_input, "w") {|i| i.print(stdin) }
    options[:in] = wrapper_input
  end

  if run_as
    raise("System does not allow forking.  run_as not usable.") unless Process.respond_to?(:fork)

    require "etc"

    u = Etc.getpwnam(run_as)

    FileUtils.chown_R(u.uid, u.gid, spooldir)

    pid = Process.fork
    if pid.nil?
      Process.gid = Process.egid = u.gid
      Process.uid = Process.euid = u.uid
      ENV.delete_if { |name| name !~ /^LC_/ }
      Process.exec(environment, command, options)
    end
  else
    pid = Process.spawn(environment, [command, command], options)
  end

  sleep 0.1 until File.exist?(wrapper_stdout)

  File.open(wrapper_pid, "w") {|p| p.write(pid)}

  Process.detach(pid)

  pid
end

#task_command(spooldir, task) ⇒ String

Given a task spec figures out the command to run using the wrapper

Parameters:

  • spooldir (String)

    path to the spool for this specific request

  • task (Hash)

    task specification

Returns:

  • (String)

    path to the command



165
166
167
168
169
170
171
172
173
174
# File 'lib/mcollective/util/tasks_support.rb', line 165

def task_command(spooldir, task)
  file_spec = task["files"][0]
  file_name = File.join(spooldir, "files", task_module(task["task"]), "tasks", file_spec["filename"])

  command = platform_specific_command(file_name)

  command.unshift(ps_shim_path) if task_input_method(task) == "powershell"

  command
end

#task_complete?(requestid) ⇒ Boolean

Determines if a task is completed

Tasks are run under the wrapper which will write the existcode to a file only after the command have exited, so this will wait for that to appear

Parameters:

  • requestid (String)

    request id for the task

Returns:

  • (Boolean)


331
332
333
334
335
336
# File 'lib/mcollective/util/tasks_support.rb', line 331

def task_complete?(requestid)
  exitcode = File.join(request_spooldir(requestid), "exitcode")
  wrapper_stderr = File.join(request_spooldir(requestid), "wrapper_stderr")

  File.exist?(wrapper_stderr) && file_size(wrapper_stderr) > 0 || File.exist?(exitcode) && file_size(exitcode) > 0
end

#task_environment(task, task_id, task_caller) ⇒ Hash

Given a task spec calculates the correct environment hash

Parameters:

  • task (Hash)

    task specification

  • task_id (String)

    task id - usually the mcollective request id

  • task_caller (String)

    the caller invoking the task

Returns:

  • (Hash)


182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/mcollective/util/tasks_support.rb', line 182

def task_environment(task, task_id, task_caller)
  environment = {
    "_task" => task["task"],
    "_choria_task_id" => task_id,
    "_choria_task_caller" => task_caller
  }

  environment["PATH"] = "#{aio_bin_path}#{File::PATH_SEPARATOR}#{ENV['PATH']}" if aio?

  return environment unless task["input"]
  return environment unless ["both", "environment"].include?(task_input_method(task))

  JSON.parse(task["input"]).each do |k, v|
    environment["PT_%s" % k] = v.to_s
  end

  environment["PT__task"] = task["task"]
  environment["PT__installdir"] = File.join(request_spooldir(task_id), "files")

  environment
end

#task_failed?(status) ⇒ Boolean

Determines if a task failed based on its status

Parameters:

Returns:

  • (Boolean)


475
476
477
478
479
480
481
482
483
# File 'lib/mcollective/util/tasks_support.rb', line 475

def task_failed?(status)
  return true unless status["wrapper_spawned"]
  return true unless status["wrapper_pid"]
  return true unless status["wrapper_error"].empty?
  return true if status["exitcode"] != 0 && status["completed"]
  return true if status["stdout"].include?("_error")

  false
end

#task_file?(file) ⇒ Boolean

Validates a task cache file

Parameters:

  • file (Hash)

    a file hash as per the task metadata

Returns:

  • (Boolean)


693
694
695
696
697
698
699
700
701
702
703
# File 'lib/mcollective/util/tasks_support.rb', line 693

def task_file?(file)
  file_name = task_file_name(file)

  Log.debug("Checking if file %s is cached using %s" % [file_name, file.pretty_inspect])

  return false unless File.exist?(file_name)
  return false unless file_size(file_name) == file["size_bytes"]
  return false unless file_sha256(file_name) == file["sha256"]

  true
end

#task_file_name(file) ⇒ String

Determines the cache path for a task file

Parameters:

  • file (Hash)

    a file hash as per the task metadata

Returns:

  • (String)

    the directory the file would go into



598
599
600
# File 'lib/mcollective/util/tasks_support.rb', line 598

def task_file_name(file)
  File.join(cache_dir, file["sha256"])
end

#task_input(task) ⇒ Hash?

Given a task spec, creates the standard input

Parameters:

  • task (Hash)

    task specification

Returns:

  • (Hash, nil)


247
248
249
# File 'lib/mcollective/util/tasks_support.rb', line 247

def task_input(task)
  task["input"] if ["both", "powershell", "stdin"].include?(task_input_method(task))
end

#task_input_method(task) ⇒ "powershell", ...

Given a task spec figures out the input method

Parameters:

  • task (Hash)

    task specification

Returns:

  • ("powershell", "both", "stdin", "environment")


149
150
151
152
153
154
155
156
157
158
# File 'lib/mcollective/util/tasks_support.rb', line 149

def task_input_method(task)
  # the spec says only 1 executable, no idea what the point of the 'files' is
  file_extension = File.extname(task["files"][0]["filename"])

  input_method = task["input_method"]
  input_method = "powershell" if input_method.nil? && file_extension == ".ps1"
  input_method ||= "both"

  input_method
end

#task_metadata(task, environment) ⇒ Hash

Requests a task metadata from Puppet Server

Parameters:

  • task (String)

    a task name like module::task

  • environment (String)

    the puppet environmnet like production

Returns:

  • (Hash)

    the metadata according to the v3 spec

Raises:

  • (StandardError)

    when the request failed



624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
# File 'lib/mcollective/util/tasks_support.rb', line 624

def (task, environment)
  parsed = parse_task(task)
  path = "/puppet/v3/tasks/%s/%s?environment=%s" % [parsed[0], parsed[1], environment]

  resp = http_get(path)

  raise("Failed to request task metadata: %s: %s" % [resp.code, resp.body]) unless resp.code == "200"

  result = JSON.parse(resp.body)

  result["metadata"] ||= {}
  result["metadata"]["parameters"] ||= {}
  result["files"] ||= []

  result
end

#task_module(task) ⇒ String

Return a task’s module

Parameters:

Returns:

  • (String)

    the module name

Raises:

  • (StandardError)

    for invalid task names



590
591
592
# File 'lib/mcollective/util/tasks_support.rb', line 590

def task_module(task)
  parse_task(task)[0]
end

#task_names(environment) ⇒ Array<String>

Retrieves the list of known task names

Parameters:

  • environment (String)

    the environment to query

Returns:

Raises:

  • (StandardError)

    on http failure



568
569
570
# File 'lib/mcollective/util/tasks_support.rb', line 568

def task_names(environment)
  tasks(environment).map {|t| t["name"]}
end

#task_ran?(requestid) ⇒ Boolean

Determines if a task already ran by checkinf if its spool exist

Parameters:

  • requestid (String)

    request id for the task

Returns:

  • (Boolean)


319
320
321
# File 'lib/mcollective/util/tasks_support.rb', line 319

def task_ran?(requestid)
  File.directory?(request_spooldir(requestid))
end

#task_runtime(requestid) ⇒ Float

Determines how long a task ran for

Tasks that had wrapper failures will have a 0 run time, still running tasks will calculate runtime till now and so increase on each invocation

Parameters:

  • requestid (String)

    the request if for the task

Returns:

  • (Float)


405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/mcollective/util/tasks_support.rb', line 405

def task_runtime(requestid)
  spool = request_spooldir(requestid)
  wrapper_stderr = File.join(spool, "wrapper_stderr")
  wrapper_pid = File.join(spool, "wrapper_pid")
  exitcode = File.join(spool, "exitcode")

  if task_complete?(requestid) && File.exist?(exitcode)
    Float(File::Stat.new(exitcode).mtime - File::Stat.new(wrapper_pid).mtime)
  elsif task_complete?(requestid) && file_size(wrapper_stderr) > 0
    0.0
  else
    Float(Time.now - File::Stat.new(wrapper_pid).mtime)
  end
end

#task_status(requestid) ⇒ Hash

Determines the task status for given request

Parameters:

  • requestid (String)

    request id for the task

Returns:

  • (Hash)

    the task status



489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
# File 'lib/mcollective/util/tasks_support.rb', line 489

def task_status(requestid)
  raise("Task %s have not been requested" % requestid) unless task_ran?(requestid)

  spool = request_spooldir(requestid)
  stdout = File.join(spool, "stdout")
  stderr = File.join(spool, "stderr")
  exitcode = File.join(spool, "exitcode")
  wrapper_stderr = File.join(spool, "wrapper_stderr")
  wrapper_pid = File.join(spool, "wrapper_pid")
  meta = File.join(spool, "choria.json")

  result = {
    "spool" => spool,
    "task" => nil,
    "caller" => nil,
    "stdout" => "",
    "stderr" => "",
    "exitcode" => -1,
    "runtime" => task_runtime(requestid),
    "start_time" => Time.at(0).utc,
    "wrapper_spawned" => false,
    "wrapper_error" => "",
    "wrapper_pid" => nil,
    "completed" => task_complete?(requestid)
  }

  result["exitcode"] = Integer(File.read(exitcode)) if File.exist?(exitcode)

  if task_ran?(requestid)
    result["stdout"] = File.read(stdout) if File.exist?(stdout)
    result["stderr"] = File.read(stderr) if File.exist?(stderr)
    result["wrapper_spawned"] = File.exist?(wrapper_stderr) && file_size(wrapper_stderr) == 0

    result["wrapper_error"] = File.read(wrapper_stderr) if File.exist?(wrapper_stderr) && file_size(wrapper_stderr) > 0

    if File.exist?(wrapper_pid) && file_size(wrapper_pid) > 0
      result["start_time"] = File::Stat.new(wrapper_pid).mtime.utc
      result["wrapper_pid"] = Integer(File.read(wrapper_pid))
    end
  end

  if File.exist?(meta)
     = JSON.parse(File.read(meta))

    result["start_time"] = Time.at(["start_time"]).utc
    result["caller"] = ["caller"]
    result["task"] = ["task"]
  end

  result["stdout"] = create_task_stdout(
    result["stdout"],
    result["completed"],
    result["exitcode"],
    result["wrapper_error"]
  )

  result
end

#tasks(environment) ⇒ Hash

Retrieves the list of known tasks in an environment

Parameters:

  • environment (String)

    the environment to query

Returns:

  • (Hash)

    the v3 task list

Raises:

  • (StandardError)

    on http failure



553
554
555
556
557
558
559
560
561
# File 'lib/mcollective/util/tasks_support.rb', line 553

def tasks(environment)
  resp = http_get("/puppet/v3/tasks?environment=%s" % [environment])

  raise("Failed to retrieve task list: %s: %s" % [$!.class, $!.to_s]) unless resp.code == "200"

  tasks = JSON.parse(resp.body)

  tasks.sort_by {|t| t["name"]}
end

#tasks_compatible?Boolean

Note:

this should check for a compatible version of Puppet more

Determines if a machine is compatible with running bolt

Returns:

  • (Boolean)


61
62
63
# File 'lib/mcollective/util/tasks_support.rb', line 61

def tasks_compatible?
  File.exist?(wrapper_path) && File.executable?(wrapper_path)
end

#validate_task_inputs(inputs, task) ⇒ Array[Boolean, Error]

Note:

Copied from PAL TaskSignature#runnable_with?

Validates that the inputs would be acceptable to the task

Parameters:

  • inputs (Hash)

    of keys and values

  • task (Hash)

    task metadata

Returns:

  • (Array[Boolean, Error])


647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
# File 'lib/mcollective/util/tasks_support.rb', line 647

def validate_task_inputs(inputs, task)
  return [true, ""] unless task["metadata"]["parameters"]
  return [true, ""] if task["metadata"]["parameters"].empty? && inputs.empty?

  require "puppet"

  signature = {}

  task["metadata"]["parameters"].each do |k, v|
    signature[k] = Puppet::Pops::Types::TypeParser.singleton.parse(v["type"])
  end

  signature_type = Puppet::Pops::Types::TypeFactory.struct(signature)

  return [true, ""] if signature_type.instance?(inputs)

  tm = Puppet::Pops::Types::TypeMismatchDescriber.singleton
  reason = tm.describe_struct_signature(signature_type, inputs).flatten.map(&:format).join("\n")
  reason = "\nInvalid input: \n\t%s" % [reason]

  [false, reason]
end

#wait_for_task_completion(requestid) ⇒ Object

Waits for a task to complete

Parameters:

  • requestid (String)

    request id for the task



341
342
343
# File 'lib/mcollective/util/tasks_support.rb', line 341

def wait_for_task_completion(requestid)
  sleep 0.1 until task_complete?(requestid)
end

#wrapper_pathString

Path to the task wrapper executable

Returns:



101
102
103
# File 'lib/mcollective/util/tasks_support.rb', line 101

def wrapper_path
  @choria.get_option("choria.tasks.wrapper_path", aio_wrapper_path)
end