Class: Pmux::Job

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/pmux/job.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params, files) ⇒ Job

Returns a new instance of Job.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/pmux/job.rb', line 12

def initialize params, files
  @params = params
  @files = files

  @task_id = 0
  @num_r = @params[:num_r] || 0
  @params[:job_name] ||= %Q{"#{@params[:mapper]}"}

  @taskhash = {}
  @done_taskhash = {}
  @tasks = mk_tasks files
  @num_t = @tasks.size + @num_r
  @h = {:job_started_at=>Time.now,
    :map_tasks=>@tasks.size, :reduce_tasks=>@num_r,
  }
  @failed = false
end

Instance Attribute Details

#failedObject (readonly)

Returns the value of attribute failed.



10
11
12
# File 'lib/pmux/job.rb', line 10

def failed
  @failed
end

#num_rObject (readonly)

Returns the value of attribute num_r.



7
8
9
# File 'lib/pmux/job.rb', line 7

def num_r
  @num_r
end

#num_tObject (readonly)

Returns the value of attribute num_t.



7
8
9
# File 'lib/pmux/job.rb', line 7

def num_t
  @num_t
end

#reducersObject (readonly)

Returns the value of attribute reducers.



9
10
11
# File 'lib/pmux/job.rb', line 9

def reducers
  @reducers
end

#taskhashObject (readonly)

Returns the value of attribute taskhash.



8
9
10
# File 'lib/pmux/job.rb', line 8

def taskhash
  @taskhash
end

#tasksObject (readonly)

Returns the value of attribute tasks.



7
8
9
# File 'lib/pmux/job.rb', line 7

def tasks
  @tasks
end

Instance Method Details

#completed?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/pmux/job.rb', line 84

def completed?
  @taskhash.empty?
end

#delete_task_by_id(task_id) ⇒ Object



76
77
78
79
80
81
82
# File 'lib/pmux/job.rb', line 76

def delete_task_by_id task_id
  if (task = @taskhash[task_id])
    task[:status] = :DONE
    @done_taskhash[task_id] = task
    @taskhash.delete task_id
  end
end

#get_task_by_id(task_id) ⇒ Object



72
73
74
# File 'lib/pmux/job.rb', line 72

def get_task_by_id task_id
  @taskhash[task_id]
end

#idObject



68
69
70
# File 'lib/pmux/job.rb', line 68

def id
  self.object_id.abs.to_s
end

#mk_reduce_tasksObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/pmux/job.rb', line 51

def mk_reduce_tasks
  pindex = 0
  job_id = self.id
  @tasks = reducers.map {|reducer_node_addr|
    @task_id += 1
    #task = make_reduce_task pindex, reducer_node_addr
    task = {:pindex=>pindex, :job_id=>self.id, :task_id=>@task_id,
      :node_addr=>reducer_node_addr,
      :reducer=>@params[:reducer],
      :hist=>[],
    }
    @taskhash[@task_id] = task
    pindex += 1
    task
  }
end

#mk_reducer_addrs(addrs, num_r = nil) ⇒ Object



44
45
46
47
48
49
# File 'lib/pmux/job.rb', line 44

def mk_reducer_addrs addrs, num_r=nil
  num_r ||= @num_r
  step = addrs.size.to_f / num_r
  @reducers = (0..num_r-1).map {|ind| addrs[step*ind]}
  @reducers
end

#mk_tasks(files) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/pmux/job.rb', line 30

def mk_tasks files
  job_id = self.id
  files.map {|file|
    @task_id += 1
    @taskhash[@task_id] = {:job_id=>job_id, :task_id=>@task_id,
      :file=>file,
      :mapper=>@params[:mapper], #:reducer=>@params[:reducer],
      :num_r=>@num_r, :ff=>@params[:ff],
      :separator=>@params[:separator],
      :hist=>[],
    }
  }
end

#set_failedObject



88
89
90
91
# File 'lib/pmux/job.rb', line 88

def set_failed
  @taskhash.clear
  @failed = true
end

#to_jlfooterObject



104
105
106
# File 'lib/pmux/job.rb', line 104

def to_jlfooter
  h = {:job_finished_at=>@h[:job_finished_at]}
end

#to_jlheaderObject



93
94
95
96
97
98
99
100
101
102
# File 'lib/pmux/job.rb', line 93

def to_jlheader
  h = {:id=>id, :files_first=>@files.first, :tasksize=>@tasks.size,
    :params=>@params,
    :invoked_at=>@h[:invoked_at], :job_started_at=>@h[:job_started_at],
    :map_tasks=>@h[:map_tasks], :reduce_tasks=>@h[:reduce_tasks],
    :storage_name=>@params[:storage_name],
    :mapper=>@params[:mapper], :reducer=>@params[:reducer],
    :num_r=>@params[:num_r],
  }
end