Module: Legion::Extensions::Helpers::Task

Defined in:
lib/legion/extensions/helpers/task.rb

Instance Method Summary collapse

Instance Method Details

#generate_task_id(function_id:, status: 'task.queued', **opts) ⇒ Object



47
48
49
50
51
52
53
54
55
56
# File 'lib/legion/extensions/helpers/task.rb', line 47

def generate_task_id(function_id:, status: 'task.queued', **opts)
  insert = { status: status, function_id: function_id }
  insert[:payload] = Legion::JSON.dump(opts[:payload]) if opts.key? :payload
  insert[:function_args] = Legion::JSON.dump(opts[:args]) if opts.key? :args
  %i[master_id parent_id relationship_id task_id].each do |column|
    insert[column] = opts[column] if opts.key? column
  end

  { success: true, task_id: Legion::Data::Model::Task.insert(insert), **insert }
end

#generate_task_log(task_id:, function:, runner_class: to_s, **payload) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/legion/extensions/helpers/task.rb', line 9

def generate_task_log(task_id:, function:, runner_class: to_s, **payload)
  begin
    if Legion::Settings[:data][:connected]
      runner_id = Legion::Data::Model::Runner[namespace: runner_class].values[:id]
      function_id = Legion::Data::Model::Function.where(runner_id: runner_id, name: function).first.values[:id]
      return true if Legion::Data::Model::TaskLog.insert(task_id: task_id, function_id: function_id, entry: Legion::JSON.dump(payload))
    end
  rescue StandardError => e
    log.warn e.backtrace
    log.warn("generate_task_log failed, reverting to rmq message, e: #{e.message}")
  end
  Legion::Transport::Messages::TaskLog.new(task_id: task_id, runner_class: runner_class, function: function, entry: payload).publish
end

#task_update(task_id, status, **opts) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/legion/extensions/helpers/task.rb', line 23

def task_update(task_id, status, **opts)
  return if task_id.nil? || status.nil?

  begin
    if Legion::Settings[:data][:connected]
      task = Legion::Data::Model::Task[task_id]
      task.update(status: status)
      return true
    end
  rescue StandardError => e
    log.debug("task_update failed, reverting to rmq message, e: #{e.message}")
  end

  update_hash = { task_id: task_id, status: status }
  %i[results payload function_args payload results].each do |column|
    update_hash[column] = opts[column] if opts.key? column
  end
  Legion::Transport::Messages::TaskUpdate.new(**update_hash).publish
rescue StandardError => e
  log.fatal e.message
  log.fatal e.backtrace
  raise e
end