Class: Kraps::Job
- Inherits:
-
Object
- Object
- Kraps::Job
- Defined in:
- lib/kraps/job.rb
Instance Attribute Summary collapse
-
#steps ⇒ Object
readonly
Returns the value of attribute steps.
Instance Method Summary collapse
- #append(other_job, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
- #combine(other_job, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
- #dump(prefix:, worker: @worker) ⇒ Object
- #each_partition(jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
- #fresh ⇒ Object
-
#initialize(worker:) ⇒ Job
constructor
A new instance of Job.
- #load(prefix:, partitions:, partitioner:, concurrency:, worker: @worker) ⇒ Object
- #map(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
- #map_partitions(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
- #parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, before: nil, &block) ⇒ Object
- #reduce(jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
- #repartition(partitions:, jobs: nil, partitioner: nil, worker: @worker, before: nil) ⇒ Object
Constructor Details
#initialize(worker:) ⇒ Job
Returns a new instance of Job.
5 6 7 8 9 10 |
# File 'lib/kraps/job.rb', line 5 def initialize(worker:) @worker = worker @steps = [] @partitions = 0 @partitioner = HashPartitioner.new end |
Instance Attribute Details
#steps ⇒ Object (readonly)
Returns the value of attribute steps.
3 4 5 |
# File 'lib/kraps/job.rb', line 3 def steps @steps end |
Instance Method Details
#append(other_job, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/kraps/job.rb', line 106 def append(other_job, jobs: nil, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new( action: Actions::APPEND, jobs: [jobs, @partitions].compact.min, partitions: @partitions, partitioner: @partitioner, worker: worker, before: before, block: block, dependency: other_job, options: { append_step_index: other_job.steps.size - 1 } ) end end end |
#combine(other_job, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/kraps/job.rb', line 88 def combine(other_job, jobs: nil, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new( action: Actions::COMBINE, jobs: [jobs, @partitions].compact.min, partitions: @partitions, partitioner: @partitioner, worker: worker, before: before, block: block, dependency: other_job, options: { combine_step_index: other_job.steps.size - 1 } ) end end end |
#dump(prefix:, worker: @worker) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/kraps/job.rb', line 146 def dump(prefix:, worker: @worker) each_partition(worker: worker) do |partition, pairs| tempfile = Tempfile.new pairs.each do |pair| tempfile.puts(JSON.generate(pair)) end Kraps.driver.store(File.join(prefix, partition.to_s, "chunk.json"), tempfile.tap(&:rewind)) ensure tempfile&.close(true) end end |
#each_partition(jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/kraps/job.rb', line 124 def each_partition(jobs: nil, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new( action: Actions::EACH_PARTITION, jobs: [jobs, @partitions].compact.min, partitions: @partitions, partitioner: @partitioner, worker: worker, before: before, block: block ) end end end |
#fresh ⇒ Object
184 185 186 187 188 |
# File 'lib/kraps/job.rb', line 184 def fresh dup.tap do |job| job.instance_variable_set(:@steps, @steps.dup) end end |
#load(prefix:, partitions:, partitioner:, concurrency:, worker: @worker) ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/kraps/job.rb', line 160 def load(prefix:, partitions:, partitioner:, concurrency:, worker: @worker) job = parallelize(partitions: partitions, partitioner: proc { |key, _| key }, worker: worker) do |collector| (0...partitions).each do |partition| collector.call(partition) end end job.map_partitions(partitioner: partitioner, worker: worker) do |partition, _, collector| temp_paths = Downloader.download_all(prefix: File.join(prefix, partition.to_s, "/"), concurrency: concurrency) temp_paths.each do |temp_path| File.open(temp_path.path) do |stream| stream.each_line do |line| key, value = JSON.parse(line) collector.call(key, value) end end end ensure temp_paths&.delete end end |
#map(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/kraps/job.rb', line 30 def map(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do jobs = [jobs, @partitions].compact.min @partitions = partitions if partitions @partitioner = partitioner if partitioner @steps << Step.new( action: Actions::MAP, jobs: jobs, partitions: @partitions, partitioner: @partitioner, worker: worker, before: before, block: block ) end end end |
#map_partitions(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/kraps/job.rb', line 51 def map_partitions(partitions: nil, partitioner: nil, jobs: nil, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do jobs = [jobs, @partitions].compact.min @partitions = partitions if partitions @partitioner = partitioner if partitioner @steps << Step.new( action: Actions::MAP_PARTITIONS, jobs: jobs, partitions: @partitions, partitioner: @partitioner, worker: worker, before: before, block: block ) end end end |
#parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, before: nil, &block) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/kraps/job.rb', line 12 def parallelize(partitions:, partitioner: HashPartitioner.new, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do @partitions = partitions @partitioner = partitioner @steps << Step.new( action: Actions::PARALLELIZE, partitions: @partitions, partitioner: @partitioner, worker: worker, before: before, block: block ) end end end |
#reduce(jobs: nil, worker: @worker, before: nil, &block) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/kraps/job.rb', line 72 def reduce(jobs: nil, worker: @worker, before: nil, &block) fresh.tap do |job| job.instance_eval do @steps << Step.new( action: Actions::REDUCE, jobs: [jobs, @partitions].compact.min, partitions: @partitions, partitioner: @partitioner, worker: worker, before: before, block: block ) end end end |
#repartition(partitions:, jobs: nil, partitioner: nil, worker: @worker, before: nil) ⇒ Object
140 141 142 143 144 |
# File 'lib/kraps/job.rb', line 140 def repartition(partitions:, jobs: nil, partitioner: nil, worker: @worker, before: nil) map(jobs: jobs, partitions: partitions, partitioner: partitioner, worker: worker, before: before) do |key, value, collector| collector.call(key, value) end end |