Module: Legion::Runner::Status
- Defined in:
- lib/legion/runner/status.rb
Class Method Summary collapse
- .generate_task_id(runner_class:, function:, status: 'task.queued', **opts) ⇒ Object
- .update(task_id:, status: 'task.completed', **opts) ⇒ Object
- .update_db(task_id:, status: 'task.completed', **opts) ⇒ Object
- .update_rmq(task_id:, status: 'task.completed', **opts) ⇒ Object
Class Method Details
.generate_task_id(runner_class:, function:, status: 'task.queued', **opts) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/legion/runner/status.rb', line 40 def self.generate_task_id(runner_class:, function:, status: 'task.queued', **opts) Legion::Logging.debug "Legion::Runner::Status.generate_task_id called, #{runner_class}, #{function}, status: #{status}, #{opts}" return nil unless Legion::Settings[:data][:connected] runner = Legion::Data::Model::Runner.where(namespace: runner_class.to_s.downcase).first return nil if runner.nil? function = Legion::Data::Model::Function.where(runner_id: runner.values[:id], name: function).first return nil if function.nil? insert = { status: status, function_id: function.values[:id] } insert[:parent_id] = opts[:task_id] if opts.key? :task_id insert[:master_id] = opts[:task_id] if opts.key? :task_id insert[:payload] = Legion::JSON.dump(opts[:payload]) if opts.key? :payload %i[function_args master_id parent_id relationship_id].each do |column| next unless opts.key? column insert[column] = opts[column].is_a?(Hash) ? Legion::JSON.dump(opts[column]) : opts[column] end { success: true, task_id: Legion::Data::Model::Task.insert(insert), **insert } rescue StandardError => e Legion::Logging.error e. Legion::Logging.error e.backtrace raise(e) end |
.update(task_id:, status: 'task.completed', **opts) ⇒ Object
4 5 6 7 8 9 10 11 12 13 |
# File 'lib/legion/runner/status.rb', line 4 def self.update(task_id:, status: 'task.completed', **opts) Legion::Logging.debug "Legion::Runner::Status.update called, #{task_id}, status: #{status}, #{opts}" return if status.nil? if Legion::Settings[:data][:connected] update_db(task_id: task_id, status: status, **opts) else update_rmq(task_id: task_id, status: status, **opts) end end |
.update_db(task_id:, status: 'task.completed', **opts) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/legion/runner/status.rb', line 28 def self.update_db(task_id:, status: 'task.completed', **opts) return if status.nil? task = Legion::Data::Model::Task[task_id] task.update(status: status) rescue StandardError => e Legion::Logging.warn e. Legion::Logging.warn 'Legion::Runner.update_status_db failed, defaulting to rabbitmq' Legion::Logging.warn e.backtrace update_rmq(task_id: task_id, status: status, **opts) end |
.update_rmq(task_id:, status: 'task.completed', **opts) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/legion/runner/status.rb', line 15 def self.update_rmq(task_id:, status: 'task.completed', **opts) return if status.nil? Legion::Transport::Messages::TaskUpdate.new(task_id: task_id, status: status, **opts).publish rescue StandardError => e Legion::Logging.fatal e. Legion::Logging.fatal e.backtrace retries ||= 0 Legion::Logging.fatal 'Will retry in 3 seconds' if retries < 5 sleep(3) retry if (retries += 1) < 5 end |