Class: Kraps::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/kraps/job.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker:) ⇒ Job

Returns a new instance of Job.



5
6
7
8
9
10
# File 'lib/kraps/job.rb', line 5

def initialize(worker:)
  @worker = worker
  @steps = []
  @partitions = 0
  @partitioner = HashPartitioner.new
end

Instance Attribute Details

#stepsObject (readonly)

Returns the value of attribute steps.



3
4
5
# File 'lib/kraps/job.rb', line 3

def steps
  @steps
end

Instance Method Details

#combine(other_job, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/kraps/job.rb', line 84

def combine(other_job, jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @steps << Step.new(
        action: Actions::COMBINE,
        jobs: [jobs, @partitions].compact.min,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block,
        dependency: other_job,
        options: { combine_step_index: other_job.steps.size - 1 }
      )
    end
  end
end

#dump(prefix:, worker: @worker) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/kraps/job.rb', line 124

def dump(prefix:, worker: @worker)
  each_partition(worker: worker) do |partition, pairs|
    tempfile = Tempfile.new

    pairs.each do |pair|
      tempfile.puts(JSON.generate(pair))
    end

    Kraps.driver.store(File.join(prefix, partition.to_s, "chunk.json"), tempfile.tap(&:rewind))
  ensure
    tempfile&.close(true)
  end
end

#each_partition(jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/kraps/job.rb', line 102

def each_partition(jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @steps << Step.new(
        action: Actions::EACH_PARTITION,
        jobs: [jobs, @partitions].compact.min,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#freshObject



163
164
165
166
167
# File 'lib/kraps/job.rb', line 163

def fresh
  dup.tap do |job|
    job.instance_variable_set(:@steps, @steps.dup)
  end
end

#load(prefix:, partitions:, partitioner:, worker: @worker) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/kraps/job.rb', line 138

def load(prefix:, partitions:, partitioner:, worker: @worker)
  job = parallelize(partitions: partitions, partitioner: proc { |key, _| key }, worker: worker) do |collector|
    (0...partitions).each do |partition|
      collector.call(partition)
    end
  end

  job.map_partitions(partitioner: partitioner, worker: worker) do |partition, _, collector|
    tempfile = Tempfile.new

    path = File.join(prefix, partition.to_s, "chunk.json")
    next unless Kraps.driver.exists?(path)

    Kraps.driver.download(path, tempfile.path)

    tempfile.each_line do |line|
      key, value = JSON.parse(line)

      collector.call(key, value)
    end
  ensure
    tempfile&.close(true)
  end
end

#map(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/kraps/job.rb', line 30

def map(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @partitions = partitions if partitions
      @partitioner = partitioner if partitioner

      @steps << Step.new(
        action: Actions::MAP,
        jobs: [jobs, @partitions].compact.min,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#map_partitions(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/kraps/job.rb', line 49

def map_partitions(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @partitions = partitions if partitions
      @partitioner = partitioner if partitioner

      @steps << Step.new(
        action: Actions::MAP_PARTITIONS,
        jobs: [jobs, @partitions].compact.min,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, before: nil, &block) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/kraps/job.rb', line 12

def parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @partitions = partitions
      @partitioner = partitioner

      @steps << Step.new(
        action: Actions::PARALLELIZE,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#reduce(jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/kraps/job.rb', line 68

def reduce(jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @steps << Step.new(
        action: Actions::REDUCE,
        jobs: [jobs, @partitions].compact.min,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#repartition(partitions:, jobs: nil, partitioner: nil, worker: @worker, before: nil) ⇒ Object



118
119
120
121
122
# File 'lib/kraps/job.rb', line 118

def repartition(partitions:, jobs: nil, partitioner: nil, worker: @worker, before: nil)
  map(jobs: jobs, partitions: partitions, partitioner: partitioner, worker: worker, before: before) do |key, value, collector|
    collector.call(key, value)
  end
end