Module: Legion::Extensions::Helpers::Task
- Defined in:
- lib/legion/extensions/helpers/task.rb
Instance Method Summary collapse
- #generate_task_id(function_id:, status: 'task.queued', **opts) ⇒ Object
- #generate_task_log(task_id:, function:, runner_class: to_s, **payload) ⇒ Object
- #task_update(task_id, status, use_database: true, **opts) ⇒ Object
Instance Method Details
#generate_task_id(function_id:, status: 'task.queued', **opts) ⇒ Object
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/legion/extensions/helpers/task.rb', line 47 def generate_task_id(function_id:, status: 'task.queued', **opts) insert = { status: status, function_id: function_id } insert[:payload] = Legion::JSON.dump(opts[:payload]) if opts.key? :payload insert[:function_args] = Legion::JSON.dump(opts[:args]) if opts.key? :args %i[master_id parent_id relationship_id task_id].each do |column| insert[column] = opts[column] if opts.key? column end { success: true, task_id: Legion::Data::Model::Task.insert(insert), **insert } end |
#generate_task_log(task_id:, function:, runner_class: to_s, **payload) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/legion/extensions/helpers/task.rb', line 9 def generate_task_log(task_id:, function:, runner_class: to_s, **payload) begin if Legion::Settings[:data][:connected] runner_id = Legion::Data::Model::Runner[namespace: runner_class].values[:id] function_id = Legion::Data::Model::Function.where(runner_id: runner_id, name: function).first.values[:id] return true if Legion::Data::Model::TaskLog.insert(task_id: task_id, function_id: function_id, entry: Legion::JSON.dump(payload)) end rescue StandardError => e log.warn e.backtrace log.warn("generate_task_log failed, reverting to rmq message, e: #{e.}") end Legion::Transport::Messages::TaskLog.new(task_id: task_id, runner_class: runner_class, function: function, entry: payload).publish end |
#task_update(task_id, status, use_database: true, **opts) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/legion/extensions/helpers/task.rb', line 23 def task_update(task_id, status, use_database: true, **opts) return if task_id.nil? || status.nil? begin if Legion::Settings[:data][:connected] && use_database task = Legion::Data::Model::Task[task_id] task.update(status: status) return true end rescue StandardError => e log.debug("task_update failed, reverting to rmq message, e: #{e.}") end update_hash = { task_id: task_id, status: status } %i[results payload function_args payload results].each do |column| update_hash[column] = opts[column] if opts.key? column end Legion::Transport::Messages::TaskUpdate.new(**update_hash).publish rescue StandardError => e log.fatal e. log.fatal e.backtrace raise e end |