Class: UpdateTaskStatusJob

Inherits:
ContainerBrokerBaseJob show all
Extended by:
RequestIdFromTask
Defined in:
app/jobs/update_task_status_job.rb

Defined Under Namespace

Classes: InvalidContainerStatusError

Constant Summary

Constants inherited from ContainerBrokerBaseJob

ContainerBrokerBaseJob::JOB_METRIC

Instance Method Summary collapse

Methods included from RequestIdFromTask

request_id_from_args

Methods inherited from ContainerBrokerBaseJob

request_id_from_args

Instance Method Details

#add_metric(task) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'app/jobs/update_task_status_job.rb', line 54

def add_metric(task)
  Metrics.new("tasks").count(
    task_id: task.id,
    event_id: task&.tags&.dig("event_id"),
    api_id: task&.tags&.dig("api_id").to_i,
    name: task&.name,
    type: task&.execution_type,
    slot: task&.slot&.name,
    node: task&.slot&.node&.name,
    started_at: task.started_at,
    finished_at: task.finished_at,
    duration: task.milliseconds_running,
    processing_time: task.seconds_running.to_i,
    error: task.error,
    status: task.status
  )
end

#perform(task) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'app/jobs/update_task_status_job.rb', line 10

def perform(task)
  Rails.logger.debug("Updating status for task #{task}")
  Rails.logger.debug("Task #{task} is running in slot #{task.slot}")

  execution_info = task.slot.node.runner_service(:fetch_execution_info).perform(task: task)

  Rails.logger.debug("Got runner #{execution_info.id} with state #{execution_info.status}")

  unless execution_info.terminated?
    Rails.logger.debug("Runner should be terminated but it is #{execution_info.status}. Execution info is #{execution_info.to_h}")
    raise InvalidContainerStatusError,
          "Runner should be terminated (current status: #{execution_info.status})"
  end

  Rails.logger.debug("Container is in status #{execution_info.status} and exit code #{execution_info.exit_code}")

  task.exit_code = execution_info.exit_code
  task.started_at = execution_info.started_at
  task.finished_at = execution_info.finished_at

  persist_logs(task)

  if execution_info.success?
    Rails.logger.debug("Marking task as completed and no errors")
    task.error = nil
    task.completed!
  else
    Rails.logger.debug("Marked task for retry and set error as #{execution_info.error}")
    task.mark_as_retry(error: execution_info.error)
  end

  task.save!

  add_metric(task)
end

#persist_logs(task) ⇒ Object



46
47
48
49
50
51
52
# File 'app/jobs/update_task_status_job.rb', line 46

def persist_logs(task)
  return unless task.persist_logs

  Rails.logger.debug("Persisting logs for #{task}")
  container_logs = task.slot.node.runner_service(:fetch_logs).perform(task: task)
  task.set_logs(container_logs)
end