Class: Mortar::Command::Jobs

Inherits:
Base
  • Object
show all
Includes:
Git
Defined in:
lib/mortar/command/jobs.rb

Overview

run and view status of pig jobs (run, status)

Constant Summary collapse

CLUSTER_TYPE__SINGLE_JOB =
'single_job'
CLUSTER_TYPE__PERSISTENT =
'persistent'
CLUSTER_TYPE__PERMANENT =
'permanent'

Instance Attribute Summary

Attributes inherited from Base

#args, #options

Instance Method Summary collapse

Methods inherited from Base

#api, #ask_public, #config_parameters, #get_error_message_context, #git, #initialize, #initialize_embedded_project, #luigi_parameters, namespace, #pig_parameters, #project, #register_api_call, #register_do, #register_project, #spark_script_arguments, #validate_project_name, #validate_project_structure

Methods included from Helpers

#action, #ask, #confirm, #copy_if_not_present_at_dest, #default_host, #deprecate, #display, #display_header, #display_object, #display_row, #display_table, #display_with_indent, #download_to_file, #ensure_dir_exists, #error, error_with_failure, error_with_failure=, extended, extended_into, #format_bytes, #format_date, #format_with_bang, #full_host, #get_terminal_environment, #home_directory, #host, #hprint, #hputs, included, included_into, #installed_with_omnibus?, #json_decode, #json_encode, #line_formatter, #longest, #output_with_bang, #pending_github_team_state_message, #quantify, #redisplay, #retry_on_exception, #running_on_a_mac?, #running_on_windows?, #set_buffer, #shell, #spinner, #string_distance, #styled_array, #styled_error, #styled_hash, #styled_header, #suggestion, #test_name, #ticking, #time_ago, #truncate, #warning, #with_tty, #write_to_file

Constructor Details

This class inherits a constructor from Mortar::Command::Base

Instance Method Details

#add_pig_job_fields(job_status, job_display_entries) ⇒ Object



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/mortar/command/jobs.rb', line 240

def add_pig_job_fields(job_status, job_display_entries)
 if job_status["outputs"] && job_status["outputs"].length > 0
    job_display_entries["outputs"] = Hash.new { |h,k| h[k] = [] }
    job_status["outputs"].select{|o| o["alias"]}.collect{ |output|
      output_hash = {}
      output_hash["location"] = output["location"] if output["location"]
      output_hash["records"] = output["records"] if output["records"]
      [output['alias'], output_hash]
    }.each{ |k,v| job_display_entries["outputs"][k] << v }
  end

  if job_status["num_hadoop_jobs"] && job_status["num_hadoop_jobs_succeeded"]
    job_display_entries["progress"] = "#{job_status["progress"]}%"
    job_display_entries["hadoop jobs complete"] = 
      '%0.2f / %0.2f' % [job_status["num_hadoop_jobs_succeeded"], job_status["num_hadoop_jobs"]]
  elsif job_status["num_hadoop_jobs_succeeded"]
    job_display_entries["progress"] = '%0.2f MapReduce Jobs complete.' % job_status["num_hadoop_jobs_succeeded"]
  else
    job_display_entries["progress"] = "#{job_status["progress"]}%"
  end
end

#display_job_status(job_status) ⇒ Object

Inner function to display the hash table when the job is complte



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/mortar/command/jobs.rb', line 263

def display_job_status(job_status)
  if (job_status['job_type'] == Mortar::API::Jobs::JOB_TYPE_LUIGI)
    job_display_entries = luigi_job_display_entries(job_status)
  else
    job_display_entries = pig_job_display_entries(job_status)
    add_pig_job_fields(job_status, job_display_entries)
  end
  
  unless job_status["error"].nil? || job_status["error"]["message"].nil?
    error_context = get_error_message_context(job_status["error"]["message"])
    unless error_context == ""
      job_status["error"]["help"] = error_context
    end
    job_status["error"].each_pair do |key, value|
      job_display_entries["error - #{key}"] = value
    end
  end

  styled_header("#{job_status["display_name"]} (job_id: #{job_status["job_id"]})")
  styled_hash(job_display_entries)
end

#indexObject

jobs

Show recent and running jobs.

-l, –limit LIMITJOBS # Limit the number of jobs returned (defaults to 10) -s, –skip SKIPJOBS # Skip a certain amount of jobs (defaults to 0)

Examples:

List the last 20 jobs:
     $ mortar jobs -l 20


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/mortar/command/jobs.rb', line 41

def index
  validate_arguments!

  options[:limit] ||= '10'
  options[:skip] ||= '0'
  jobs = api.get_jobs(options[:skip], options[:limit]).body['jobs']
  jobs.each do |job|
    if job['start_timestamp']
      job['start_timestamp'] = Time.parse(job['start_timestamp']).strftime('%A, %B %e, %Y, %l:%M %p')
    end
  end
  headers = [ 'job_id', 'script' , 'status' , 'start_date' , 'elapsed_time' , 'cluster_size' , 'cluster_id']
  columns = [ 'job_id', 'display_name', 'status_description', 'start_timestamp', 'duration', 'cluster_size', 'cluster_id']
  display_table(jobs, columns, headers)
end

#luigi_job_display_entries(job_status) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/mortar/command/jobs.rb', line 224

def luigi_job_display_entries(job_status)
  status = job_status['status_description']
  if !job_status['status_detail_description'].nil?
    status += " - #{job_status['status_detail_description']}"
  end

  job_display_entries = {
    "status" => status,
    "job submitted at" => job_status["start_timestamp"],
    "job began running at" => job_status["running_timestamp"],
    "job finished at" => job_status["stop_timestamp"],
    "job running for" => job_status["duration"],
    "job run with parameters" => job_status["parameters"],
  }
end

#pig_job_display_entries(job_status) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
# File 'lib/mortar/command/jobs.rb', line 212

def pig_job_display_entries(job_status)
  job_display_entries = {
    "status" => job_status["status_description"],
    "cluster_id" => job_status["cluster_id"],
    "job submitted at" => job_status["start_timestamp"],
    "job began running at" => job_status["running_timestamp"],
    "job finished at" => job_status["stop_timestamp"],
    "job running for" => job_status["duration"],
    "job run with parameters" => job_status["parameters"],
  }
end

#runObject

jobs:run SCRIPT

Run a job on a Mortar Hadoop cluster.

-c, –clusterid CLUSTERID # Run job on an existing cluster with ID of CLUSTERID (optional) -s, –clustersize NUMNODES # Run job on a new cluster, with NUMNODES nodes (optional; must be >= 2 if provided) -1, –singlejobcluster # Stop the cluster after job completes. (Default: false–cluster can be used for other jobs, and will shut down after 1 hour of inactivity) -2, –permanentcluster # Don’t automatically stop the cluster after it has been idle for an hour (Default: false–cluster will be shut down after 1 hour of inactivity) -3, –spot # Use spot instances for this cluster (Default: false, only applicable to new clusters) -p, –parameter NAME=VALUE # Set a pig parameter value in your script. -f, –param-file PARAMFILE # Load pig parameter values from a file. -d, –donotnotify # Don’t send an email on job completion. (Default: false–an email will be sent to you once the job completes) -P, –project PROJECTNAME # Use a project that is not checked out in the current directory. Runs code from project’s master branch in github rather than snapshotting local code. -B, –branch BRANCHNAME # Used with –project to specify a non-master branch -g, –pigversion PIG_VERSION # Set pig version. Options are <PIG_VERSION_OPTIONS>.

Examples:

Run the generate_regression_model_coefficients pigscript on a 3 node cluster.
    $ mortar jobs:run pigscripts/generate_regression_model_coefficients.pig --clustersize 3

Run the regression_controller control script on a 3 node cluster.
    $ mortar jobs:run controlscripts/regression_controller.py --clustersize 3


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/mortar/command/jobs.rb', line 80

def run
  script_name = shift_argument
  unless script_name
    error("Usage: mortar jobs:run SCRIPT\nMust specify SCRIPT.")
  end
  
  validate_arguments!
  if options[:project]
    project_name = options[:project]

    if File.extname(script_name) == ".pig"
      is_control_script = false
      script_name = File.basename(script_name, ".*")
    elsif File.extname(script_name) == ".py"
      is_control_script = true
      script_name = File.basename(script_name, ".*")
    else
      error "Unable to guess script type (controlscript vs pigscript).\n" + 
        "When running a script with the --project option, please provide the full path and filename, e.g.\n" +
        " mortar run pigscripts/#{script_name}.pig --project #{project_name}"
    end
  else
    project_name = project.name
    script = validate_script!(script_name)

    script_name = script.name
    case script
    when Mortar::Project::PigScript
      is_control_script = false
    when Mortar::Project::ControlScript
      is_control_script = true
    else
      error "Unknown Script Type"
    end
  end
  
  unless options[:clusterid] || options[:clustersize]
    clusters = api.get_clusters(pig_version.cluster_backend).body['clusters']

    largest_free_cluster = clusters.select{ |c| \
      c['running_jobs'].length == 0 && c['status_code'] == Mortar::API::Clusters::STATUS_RUNNING }.
      max_by{|c| c['size']}

    if largest_free_cluster.nil?
      options[:clustersize] = 2
      display("Defaulting to running job on new cluster of size 2")
    else
      options[:clusterid] = largest_free_cluster['cluster_id']
      display("Defaulting to running job on largest existing free cluster, id = " + 
              largest_free_cluster['cluster_id'] + ", size = " + largest_free_cluster['size'].to_s)
    end
  end
    
  if options[:clusterid]
    [:clustersize, :singlejobcluster, :permanentcluster].each do |opt|
      unless options[opt].nil?
        error("Option #{opt.to_s} cannot be set when running a job on an existing cluster (with --clusterid option)")
      end
    end
  end
 
  if options[:project]
    if options[:branch]
      git_ref = options[:branch]
    else
      git_ref = "master"
    end
  else
    git_ref = sync_code_with_cloud()
  end
  
  notify_on_job_finish = ! options[:donotnotify]
  
  # post job to API    
  response = action("Requesting job execution") do
    if options[:clustersize]
      if options[:singlejobcluster] && options[:permanentcluster]
        error("Cannot declare cluster as both --singlejobcluster and --permanentcluster")
      end
      cluster_size = options[:clustersize].to_i
      cluster_type = CLUSTER_TYPE__PERSISTENT
      if options[:singlejobcluster]
        cluster_type = CLUSTER_TYPE__SINGLE_JOB
      elsif options[:permanentcluster]
        cluster_type = CLUSTER_TYPE__PERMANENT
      end
      use_spot_instances = options[:spot] || false
      api.post_pig_job_new_cluster(project_name, script_name, git_ref, cluster_size, 
        :pig_version => pig_version.version, 
        :project_script_path => script.rel_path,
        :parameters => pig_parameters,
        :cluster_type => cluster_type,
        :notify_on_job_finish => notify_on_job_finish,
        :is_control_script => is_control_script,
        :use_spot_instances => use_spot_instances).body
    else
      cluster_id = options[:clusterid]
      api.post_pig_job_existing_cluster(project_name, script_name, git_ref, cluster_id,
        :pig_version => pig_version.version, 
        :project_script_path => script.rel_path,
        :parameters => pig_parameters,
        :notify_on_job_finish => notify_on_job_finish,
        :is_control_script => is_control_script).body
    end
  end
  
  display("job_id: #{response['job_id']}")
  display
  display("Job status can be viewed on the web at:\n\n #{response['web_job_url']}")
  display
  display("Or by running:\n\n  mortar jobs:status #{response['job_id']} --poll")
  display

  response['job_id']
end

#statusObject

jobs:status JOB_ID

Check the status of a job.

-p, –poll # Poll the status of a job



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/mortar/command/jobs.rb', line 205

def status
  job_id = shift_argument
  unless job_id
    error("Usage: mortar jobs:status JOB_ID\nMust specify JOB_ID.")
  end
  validate_arguments!

  def pig_job_display_entries(job_status)
    job_display_entries = {
      "status" => job_status["status_description"],
      "cluster_id" => job_status["cluster_id"],
      "job submitted at" => job_status["start_timestamp"],
      "job began running at" => job_status["running_timestamp"],
      "job finished at" => job_status["stop_timestamp"],
      "job running for" => job_status["duration"],
      "job run with parameters" => job_status["parameters"],
    }
  end

  def luigi_job_display_entries(job_status)
    status = job_status['status_description']
    if !job_status['status_detail_description'].nil?
      status += " - #{job_status['status_detail_description']}"
    end

    job_display_entries = {
      "status" => status,
      "job submitted at" => job_status["start_timestamp"],
      "job began running at" => job_status["running_timestamp"],
      "job finished at" => job_status["stop_timestamp"],
      "job running for" => job_status["duration"],
      "job run with parameters" => job_status["parameters"],
    }
  end

  def add_pig_job_fields(job_status, job_display_entries)
   if job_status["outputs"] && job_status["outputs"].length > 0
      job_display_entries["outputs"] = Hash.new { |h,k| h[k] = [] }
      job_status["outputs"].select{|o| o["alias"]}.collect{ |output|
        output_hash = {}
        output_hash["location"] = output["location"] if output["location"]
        output_hash["records"] = output["records"] if output["records"]
        [output['alias'], output_hash]
      }.each{ |k,v| job_display_entries["outputs"][k] << v }
    end

    if job_status["num_hadoop_jobs"] && job_status["num_hadoop_jobs_succeeded"]
      job_display_entries["progress"] = "#{job_status["progress"]}%"
      job_display_entries["hadoop jobs complete"] = 
        '%0.2f / %0.2f' % [job_status["num_hadoop_jobs_succeeded"], job_status["num_hadoop_jobs"]]
    elsif job_status["num_hadoop_jobs_succeeded"]
      job_display_entries["progress"] = '%0.2f MapReduce Jobs complete.' % job_status["num_hadoop_jobs_succeeded"]
    else
      job_display_entries["progress"] = "#{job_status["progress"]}%"
    end
  end
  
  # Inner function to display the hash table when the job is complte
  def display_job_status(job_status)
    if (job_status['job_type'] == Mortar::API::Jobs::JOB_TYPE_LUIGI)
      job_display_entries = luigi_job_display_entries(job_status)
    else
      job_display_entries = pig_job_display_entries(job_status)
      add_pig_job_fields(job_status, job_display_entries)
    end
    
    unless job_status["error"].nil? || job_status["error"]["message"].nil?
      error_context = get_error_message_context(job_status["error"]["message"])
      unless error_context == ""
        job_status["error"]["help"] = error_context
      end
      job_status["error"].each_pair do |key, value|
        job_display_entries["error - #{key}"] = value
      end
    end

    styled_header("#{job_status["display_name"]} (job_id: #{job_status["job_id"]})")
    styled_hash(job_display_entries)
  end
  
  # If polling the status
  if options[:poll]
    job_status = nil
    ticking(polling_interval) do |ticks|
      job_status = api.get_job(job_id).body
      # If the job is complete exit and display the table normally 
      if Mortar::API::Jobs::STATUSES_COMPLETE.include?(job_status["status_code"] )
        redisplay("")
        display_job_status(job_status)
        break
      end

      # If not a Luigi job and job is running show the progress bar
      if job_status['job_type'] != Mortar::API::Jobs::JOB_TYPE_LUIGI and job_status["status_code"] == Mortar::API::Jobs::STATUS_RUNNING && job_status["num_hadoop_jobs"]
        progressbar = "=" + ("=" * (job_status["progress"].to_i / 5)) + ">"

        if job_status["num_hadoop_jobs"] && job_status["num_hadoop_jobs_succeeded"]
          hadoop_jobs_ratio_complete = 
            '%0.2f / %0.2f' % [job_status["num_hadoop_jobs_succeeded"], job_status["num_hadoop_jobs"]]
        end

        printf("\r[#{spinner(ticks)}] Status: [%-22s] %s%% Complete (%s MapReduce jobs finished)", progressbar, job_status["progress"], hadoop_jobs_ratio_complete)

      elsif job_status['job_type'] != Mortar::API::Jobs::JOB_TYPE_LUIGI and job_status["status_code"] == Mortar::API::Jobs::STATUS_RUNNING
        jobs_complete = '%0.2f' % job_status["num_hadoop_jobs_succeeded"]
        printf("\r[#{spinner(ticks)}] #{jobs_complete} MapReduce Jobs complete.")

      # If the job is not complete, but not in the running state, just display its status
      else
        job_display_status = job_status['status_description']
        if !job_status['status_details'].nil?
          job_display_status += " - #{job_status['status_details']}"
        end
        if !job_status['status_detail_description'].nil?
          job_display_status += " - #{job_status['status_detail_description']}"
        end
        redisplay("[#{spinner(ticks)}] Status: #{job_display_status}")
      end
    end
    job_status
  # If not polling, get the job status and display the results
  else
    job_status = api.get_job(job_id).body
    display_job_status(job_status)
    job_status
  end
end

#stopObject

jobs:stop JOB_ID

Stop a running job.



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/mortar/command/jobs.rb', line 337

def stop
  job_id = shift_argument
  unless job_id
    error("Usage: mortar jobs:stop JOB_ID\nMust specify JOB_ID.")
  end
  validate_arguments!

  response = api.stop_job(job_id).body  

  #TODO: jkarn - Once all servers have the additional message field we can remove this check.
  if response['message'].nil?
    display("Stopping job #{job_id}.")
  else
    display(response['message'])
  end
end