Class: Executor::Job
- Inherits:
-
Object
- Object
- Executor::Job
- Defined in:
- lib/hyperflow-amqp-executor/job.rb
Instance Attribute Summary collapse
-
#metrics ⇒ Object
readonly
Returns the value of attribute metrics.
Instance Method Summary collapse
- #cmdline ⇒ Object
- #execute ⇒ Object
-
#initialize(id, job) ⇒ Job
constructor
A new instance of Job.
- #input_size ⇒ Object
- #output_size ⇒ Object
- #publish_events(name) ⇒ Object
- #run ⇒ Object
Constructor Details
#initialize(id, job) ⇒ Job
Returns a new instance of Job.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/hyperflow-amqp-executor/job.rb', line 5 def initialize(id, job) @job = job @id = id @metrics = { timestamps: { }, executor: Executor::id } storage_module = case (@job..storage or Executor::settings.storage) when 's3', 'cloud' CloudStorage when 'local' LocalStorage when 'nfs' NFSStorage when 'plgdata' PLGDataStorage when 'gridftp' GridFTPStorage else raise "Unknown storage #{@job.storage}" end self.extend(storage_module) end |
Instance Attribute Details
#metrics ⇒ Object (readonly)
Returns the value of attribute metrics.
3 4 5 |
# File 'lib/hyperflow-amqp-executor/job.rb', line 3 def metrics @metrics end |
Instance Method Details
#cmdline ⇒ Object
81 82 83 84 85 86 87 |
# File 'lib/hyperflow-amqp-executor/job.rb', line 81 def cmdline if @job.args.is_a? Array ([@job.executable] + @job.args).map { |e| e.to_s } else "#{@job.executable} #{@job.args}" end end |
#execute ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/hyperflow-amqp-executor/job.rb', line 89 def execute begin Executor::logger.debug "[#{@id}] Executing #{cmdline}" stdout, stderr, status = Open3.capture3(*cmdline, chdir: @workdir) {exit_status: status, stderr: stderr, stdout: stdout} rescue Exception => e Executor::logger.error "[#{@id}] Error executing job: #{e}" Executor::logger.debug "[#{@id}] Backtrace\n#{e.backtrace.join("\n")}" {exit_status: -1, exceptions: [e]} end end |
#input_size ⇒ Object
102 103 104 |
# File 'lib/hyperflow-amqp-executor/job.rb', line 102 def input_size @job.inputs.map{ |file| begin File.size(@workdir+"/"+file.name) rescue 0 end }.reduce(:+) or 0 end |
#output_size ⇒ Object
106 107 108 |
# File 'lib/hyperflow-amqp-executor/job.rb', line 106 def output_size @job.outputs.map{ |file| begin File.size(@workdir+"/"+file.name) rescue 0 end }.reduce(:+) or 0 end |
#publish_events(name) ⇒ Object
74 75 76 77 78 79 |
# File 'lib/hyperflow-amqp-executor/job.rb', line 74 def publish_events(name) @metrics[:timestamps]["#{name}.started"] = Executor::publish_event "job.#{name}.started", "job.#{@id}.#{name}.started", job: @id, thread: Thread.current.__id__ results = yield @metrics[:timestamps]["#{name}.finished"] = Executor::publish_event "job.#{name}.finished", "job.#{@id}.#{name}.finished", {job: @id, thread: Thread.current.__id__}.merge(results || {}) results end |
#run ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/hyperflow-amqp-executor/job.rb', line 30 def run @metrics[:timestamps]["job.started"] = Executor::publish_event 'job.started', "job.#{@id}.started", job: @id, thread: Thread.current.__id__ @metrics[:thread] = Thread.current.__id__ results = {} workdir do |tmpdir| @workdir = tmpdir raise "Couldn't get workdir" unless @workdir storage_init if self.respond_to? :storage_init if self.respond_to? :stage_in publish_events "stage_in" do _ , @metrics[:stage_in] = time { stage_in } @metrics[:input_size] = input_size {bytes: @metrics[:input_size], time: @metrics[:stage_in]} end else @metrics[:input_size] = input_size end publish_events "execution" do results, @metrics[:execution] = time { execute } { executable: @job.executable, exit_status: results[:exit_status], time: @metrics[:execution] } end if self.respond_to? :stage_out publish_events "stage_out" do _, @metrics[:stage_out] = time { stage_out } @metrics[:output_size] = output_size { bytes: @metrics[:output_size], time: @metrics[:stage_out] } end else @metrics[:output_size] = output_size end end @metrics[:timestamps]["job.finished"] = Executor::publish_event 'job.finished', "job.#{@id}.finished", job: @id, executable: @job.executable, exit_status: results[:exit_status], metrics: @metrics, thread: Thread.current.__id__ results[:metrics] = @metrics results end |