Module: Legion::Runner::Status

Defined in:
lib/legion/runner/status.rb

Class Method Summary collapse

Class Method Details

.generate_task_id(runner_class:, function:, status: 'task.queued', **opts) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/legion/runner/status.rb', line 40

def self.generate_task_id(runner_class:, function:, status: 'task.queued', **opts)
  Legion::Logging.debug "Legion::Runner::Status.generate_task_id called, #{runner_class}, #{function}, status: #{status}, #{opts}"
  return nil unless Legion::Settings[:data][:connected]

  runner = Legion::Data::Model::Runner.where(namespace: runner_class.to_s.downcase).first
  return nil if runner.nil?

  function = Legion::Data::Model::Function.where(runner_id: runner.values[:id], name: function).first
  return nil if function.nil?

  insert = { status: status, function_id: function.values[:id] }
  insert[:parent_id] = opts[:task_id] if opts.key? :task_id
  insert[:master_id] = opts[:task_id] if opts.key? :task_id
  insert[:payload] = Legion::JSON.dump(opts[:payload]) if opts.key? :payload

  %i[function_args master_id parent_id relationship_id].each do |column|
    next unless opts.key? column

    insert[column] = opts[column].is_a?(Hash) ? Legion::JSON.dump(opts[column]) : opts[column]
  end

  { success: true, task_id: Legion::Data::Model::Task.insert(insert), **insert }
rescue StandardError => e
  Legion::Logging.error e.message
  Legion::Logging.error e.backtrace
  raise(e)
end

.update(task_id:, status: 'task.completed', **opts) ⇒ Object



4
5
6
7
8
9
10
11
12
13
# File 'lib/legion/runner/status.rb', line 4

def self.update(task_id:, status: 'task.completed', **opts)
  Legion::Logging.debug "Legion::Runner::Status.update called, #{task_id}, status: #{status}, #{opts}"
  return if status.nil?

  if Legion::Settings[:data][:connected]
    update_db(task_id: task_id, status: status, **opts)
  else
    update_rmq(task_id: task_id, status: status, **opts)
  end
end

.update_db(task_id:, status: 'task.completed', **opts) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/legion/runner/status.rb', line 28

def self.update_db(task_id:, status: 'task.completed', **opts)
  return if status.nil?

  task = Legion::Data::Model::Task[task_id]
  task.update(status: status)
rescue StandardError => e
  Legion::Logging.warn e.message
  Legion::Logging.warn 'Legion::Runner.update_status_db failed, defaulting to rabbitmq'
  Legion::Logging.warn e.backtrace
  update_rmq(task_id: task_id, status: status, **opts)
end

.update_rmq(task_id:, status: 'task.completed', **opts) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/legion/runner/status.rb', line 15

def self.update_rmq(task_id:, status: 'task.completed', **opts)
  return if status.nil?

  Legion::Transport::Messages::TaskUpdate.new(task_id: task_id, status: status, **opts).publish
rescue StandardError => e
  Legion::Logging.fatal e.message
  Legion::Logging.fatal e.backtrace
  retries ||= 0
  Legion::Logging.fatal 'Will retry in 3 seconds' if retries < 5
  sleep(3)
  retry if (retries += 1) < 5
end