Class: Kraps::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/kraps/job.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker:) ⇒ Job

Returns a new instance of Job.



5
6
7
8
9
10
# File 'lib/kraps/job.rb', line 5

def initialize(worker:)
  @worker = worker
  @steps = []
  @partitions = 0
  @partitioner = HashPartitioner.new
end

Instance Attribute Details

#stepsObject (readonly)

Returns the value of attribute steps.



3
4
5
# File 'lib/kraps/job.rb', line 3

def steps
  @steps
end

Instance Method Details

#append(other_job, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/kraps/job.rb', line 106

def append(other_job, jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @steps << Step.new(
        action: Actions::APPEND,
        jobs: [jobs, @partitions].compact.min,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block,
        dependency: other_job,
        options: { append_step_index: other_job.steps.size - 1 }
      )
    end
  end
end

#combine(other_job, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/kraps/job.rb', line 88

def combine(other_job, jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @steps << Step.new(
        action: Actions::COMBINE,
        jobs: [jobs, @partitions].compact.min,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block,
        dependency: other_job,
        options: { combine_step_index: other_job.steps.size - 1 }
      )
    end
  end
end

#dump(prefix:, worker: @worker) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/kraps/job.rb', line 146

def dump(prefix:, worker: @worker)
  each_partition(worker: worker) do |partition, pairs|
    tempfile = Tempfile.new

    pairs.each do |pair|
      tempfile.puts(JSON.generate(pair))
    end

    Kraps.driver.store(File.join(prefix, partition.to_s, "chunk.json"), tempfile.tap(&:rewind))
  ensure
    tempfile&.close(true)
  end
end

#each_partition(jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/kraps/job.rb', line 124

def each_partition(jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @steps << Step.new(
        action: Actions::EACH_PARTITION,
        jobs: [jobs, @partitions].compact.min,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#freshObject



184
185
186
187
188
# File 'lib/kraps/job.rb', line 184

def fresh
  dup.tap do |job|
    job.instance_variable_set(:@steps, @steps.dup)
  end
end

#load(prefix:, partitions:, partitioner:, concurrency:, worker: @worker) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/kraps/job.rb', line 160

def load(prefix:, partitions:, partitioner:, concurrency:, worker: @worker)
  job = parallelize(partitions: partitions, partitioner: proc { |key, _| key }, worker: worker) do |collector|
    (0...partitions).each do |partition|
      collector.call(partition)
    end
  end

  job.map_partitions(partitioner: partitioner, worker: worker) do |partition, _, collector|
    temp_paths = Downloader.download_all(prefix: File.join(prefix, partition.to_s, "/"), concurrency: concurrency)

    temp_paths.each do |temp_path|
      File.open(temp_path.path) do |stream|
        stream.each_line do |line|
          key, value = JSON.parse(line)

          collector.call(key, value)
        end
      end
    end
  ensure
    temp_paths&.delete
  end
end

#map(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/kraps/job.rb', line 30

def map(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      jobs = [jobs, @partitions].compact.min

      @partitions = partitions if partitions
      @partitioner = partitioner if partitioner

      @steps << Step.new(
        action: Actions::MAP,
        jobs: jobs,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#map_partitions(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/kraps/job.rb', line 51

def map_partitions(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      jobs = [jobs, @partitions].compact.min

      @partitions = partitions if partitions
      @partitioner = partitioner if partitioner

      @steps << Step.new(
        action: Actions::MAP_PARTITIONS,
        jobs: jobs,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, before: nil, &block) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/kraps/job.rb', line 12

def parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @partitions = partitions
      @partitioner = partitioner

      @steps << Step.new(
        action: Actions::PARALLELIZE,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#reduce(jobs: nil, worker: @worker, before: nil, &block) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/kraps/job.rb', line 72

def reduce(jobs: nil, worker: @worker, before: nil, &block)
  fresh.tap do |job|
    job.instance_eval do
      @steps << Step.new(
        action: Actions::REDUCE,
        jobs: [jobs, @partitions].compact.min,
        partitions: @partitions,
        partitioner: @partitioner,
        worker: worker,
        before: before,
        block: block
      )
    end
  end
end

#repartition(partitions:, jobs: nil, partitioner: nil, worker: @worker, before: nil) ⇒ Object



140
141
142
143
144
# File 'lib/kraps/job.rb', line 140

def repartition(partitions:, jobs: nil, partitioner: nil, worker: @worker, before: nil)
  map(jobs: jobs, partitions: partitions, partitioner: partitioner, worker: worker, before: before) do |key, value, collector|
    collector.call(key, value)
  end
end