Class: Pmux::Job
Instance Attribute Summary collapse
-
#failed ⇒ Object
readonly
Returns the value of attribute failed.
-
#num_r ⇒ Object
readonly
Returns the value of attribute num_r.
-
#num_t ⇒ Object
readonly
Returns the value of attribute num_t.
-
#reducers ⇒ Object
readonly
Returns the value of attribute reducers.
-
#taskhash ⇒ Object
readonly
Returns the value of attribute taskhash.
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
Instance Method Summary collapse
- #completed? ⇒ Boolean
- #delete_task_by_id(task_id) ⇒ Object
- #get_task_by_id(task_id) ⇒ Object
- #id ⇒ Object
-
#initialize(params, files) ⇒ Job
constructor
A new instance of Job.
- #mk_reduce_tasks ⇒ Object
- #mk_reducer_addrs(addrs, num_r = nil) ⇒ Object
- #mk_tasks(files) ⇒ Object
- #set_failed ⇒ Object
- #to_jlfooter ⇒ Object
- #to_jlheader ⇒ Object
Constructor Details
#initialize(params, files) ⇒ Job
Returns a new instance of Job.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/pmux/job.rb', line 12 def initialize params, files @params = params @files = files @task_id = 0 @num_r = @params[:num_r] || 0 @params[:job_name] ||= %Q{"#{@params[:mapper]}"} @taskhash = {} @done_taskhash = {} @tasks = mk_tasks files @num_t = @tasks.size + @num_r @h = {:job_started_at=>Time.now, :map_tasks=>@tasks.size, :reduce_tasks=>@num_r, } @failed = false end |
Instance Attribute Details
#failed ⇒ Object (readonly)
Returns the value of attribute failed.
10 11 12 |
# File 'lib/pmux/job.rb', line 10 def failed @failed end |
#num_r ⇒ Object (readonly)
Returns the value of attribute num_r.
7 8 9 |
# File 'lib/pmux/job.rb', line 7 def num_r @num_r end |
#num_t ⇒ Object (readonly)
Returns the value of attribute num_t.
7 8 9 |
# File 'lib/pmux/job.rb', line 7 def num_t @num_t end |
#reducers ⇒ Object (readonly)
Returns the value of attribute reducers.
9 10 11 |
# File 'lib/pmux/job.rb', line 9 def reducers @reducers end |
#taskhash ⇒ Object (readonly)
Returns the value of attribute taskhash.
8 9 10 |
# File 'lib/pmux/job.rb', line 8 def taskhash @taskhash end |
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
7 8 9 |
# File 'lib/pmux/job.rb', line 7 def tasks @tasks end |
Instance Method Details
#completed? ⇒ Boolean
84 85 86 |
# File 'lib/pmux/job.rb', line 84 def completed? @taskhash.empty? end |
#delete_task_by_id(task_id) ⇒ Object
76 77 78 79 80 81 82 |
# File 'lib/pmux/job.rb', line 76 def delete_task_by_id task_id if (task = @taskhash[task_id]) task[:status] = :DONE @done_taskhash[task_id] = task @taskhash.delete task_id end end |
#get_task_by_id(task_id) ⇒ Object
72 73 74 |
# File 'lib/pmux/job.rb', line 72 def get_task_by_id task_id @taskhash[task_id] end |
#id ⇒ Object
68 69 70 |
# File 'lib/pmux/job.rb', line 68 def id self.object_id.abs.to_s end |
#mk_reduce_tasks ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/pmux/job.rb', line 51 def mk_reduce_tasks pindex = 0 job_id = self.id @tasks = reducers.map {|reducer_node_addr| @task_id += 1 #task = make_reduce_task pindex, reducer_node_addr task = {:pindex=>pindex, :job_id=>self.id, :task_id=>@task_id, :node_addr=>reducer_node_addr, :reducer=>@params[:reducer], :hist=>[], } @taskhash[@task_id] = task pindex += 1 task } end |
#mk_reducer_addrs(addrs, num_r = nil) ⇒ Object
44 45 46 47 48 49 |
# File 'lib/pmux/job.rb', line 44 def mk_reducer_addrs addrs, num_r=nil num_r ||= @num_r step = addrs.size.to_f / num_r @reducers = (0..num_r-1).map {|ind| addrs[step*ind]} @reducers end |
#mk_tasks(files) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/pmux/job.rb', line 30 def mk_tasks files job_id = self.id files.map {|file| @task_id += 1 @taskhash[@task_id] = {:job_id=>job_id, :task_id=>@task_id, :file=>file, :mapper=>@params[:mapper], #:reducer=>@params[:reducer], :num_r=>@num_r, :ff=>@params[:ff], :separator=>@params[:separator], :hist=>[], } } end |
#set_failed ⇒ Object
88 89 90 91 |
# File 'lib/pmux/job.rb', line 88 def set_failed @taskhash.clear @failed = true end |
#to_jlfooter ⇒ Object
104 105 106 |
# File 'lib/pmux/job.rb', line 104 def h = {:job_finished_at=>@h[:job_finished_at]} end |
#to_jlheader ⇒ Object
93 94 95 96 97 98 99 100 101 102 |
# File 'lib/pmux/job.rb', line 93 def to_jlheader h = {:id=>id, :files_first=>@files.first, :tasksize=>@tasks.size, :params=>@params, :invoked_at=>@h[:invoked_at], :job_started_at=>@h[:job_started_at], :map_tasks=>@h[:map_tasks], :reduce_tasks=>@h[:reduce_tasks], :storage_name=>@params[:storage_name], :mapper=>@params[:mapper], :reducer=>@params[:reducer], :num_r=>@params[:num_r], } end |