Class: Autobuild::RakeTaskParallelism
- Inherits:
-
Object
- Object
- Autobuild::RakeTaskParallelism
- Defined in:
- lib/autobuild/parallel.rb
Overview
This is a rewrite of the Rake task invocation code to use parallelism
Since autobuild does not use task arguments, we don’t support them for simplicity
Defined Under Namespace
Classes: JobServer, ProcessingState, Worker
Instance Attribute Summary collapse
-
#available_workers ⇒ Object
readonly
Returns the value of attribute available_workers.
-
#finished_workers ⇒ Object
readonly
Returns the value of attribute finished_workers.
-
#job_server ⇒ Object
readonly
Returns the value of attribute job_server.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
- #discover_dependencies(all_tasks, reverse_dependencies, task) ⇒ Object
- #finish_pending_work ⇒ Object
-
#initialize(level = Autobuild.parallel_build_level) ⇒ RakeTaskParallelism
constructor
A new instance of RakeTaskParallelism.
-
#invoke_parallel(required_tasks, completion_callback: proc {}) ⇒ Object
Invokes the provided tasks.
- #resolve_cycle(all_tasks, tasks, reverse_dependencies) ⇒ Object
- #wait_for_worker_to_end(state) ⇒ Object
Constructor Details
#initialize(level = Autobuild.parallel_build_level) ⇒ RakeTaskParallelism
Returns a new instance of RakeTaskParallelism.
29 30 31 32 33 34 |
# File 'lib/autobuild/parallel.rb', line 29 def initialize(level = Autobuild.parallel_build_level) @job_server = JobServer.new(level) @available_workers = Array.new @finished_workers = Queue.new @workers = Array.new end |
Instance Attribute Details
#available_workers ⇒ Object (readonly)
Returns the value of attribute available_workers.
7 8 9 |
# File 'lib/autobuild/parallel.rb', line 7 def available_workers @available_workers end |
#finished_workers ⇒ Object (readonly)
Returns the value of attribute finished_workers.
7 8 9 |
# File 'lib/autobuild/parallel.rb', line 7 def finished_workers @finished_workers end |
#job_server ⇒ Object (readonly)
Returns the value of attribute job_server.
7 8 9 |
# File 'lib/autobuild/parallel.rb', line 7 def job_server @job_server end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
7 8 9 |
# File 'lib/autobuild/parallel.rb', line 7 def workers @workers end |
Instance Method Details
#discover_dependencies(all_tasks, reverse_dependencies, task) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/autobuild/parallel.rb', line 61 def discover_dependencies(all_tasks, reverse_dependencies, task) return if task.already_invoked? return if all_tasks.include?(task) # already discovered or being discovered all_tasks << task task.prerequisite_tasks.each do |dep_t| reverse_dependencies[dep_t] << task discover_dependencies(all_tasks, reverse_dependencies, dep_t) end end |
#finish_pending_work ⇒ Object
310 311 312 313 314 315 |
# File 'lib/autobuild/parallel.rb', line 310 def finish_pending_work while available_workers.size != workers.size w = finished_workers.pop available_workers << w end end |
#invoke_parallel(required_tasks, completion_callback: proc {}) ⇒ Object
Invokes the provided tasks. Unlike the rake code, this is a toplevel algorithm that does not use recursion
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/autobuild/parallel.rb', line 151 def invoke_parallel(required_tasks, completion_callback: proc {}) tasks = Set.new reverse_dependencies = Hash.new { |h, k| h[k] = Set.new } required_tasks.each do |t| discover_dependencies(tasks, reverse_dependencies, t) end # The queue is the set of tasks for which all prerequisites have # been successfully executed (or where not needed). I.e. it is the # set of tasks that can be queued for execution. state = ProcessingState.new(reverse_dependencies, completion_callback: completion_callback) tasks.each do |t| state.push(t) if state.ready?(t) end # Build a reverse dependency graph (i.e. a mapping from a task to # the tasks that depend on it) # This is kind-of a topological sort. However, we don't do the full # topological sort since we would then have to scan all tasks each # time for tasks that have no currently running prerequisites loop do pending_task = state.pop unless pending_task # If we have pending workers, wait for one to be finished # until either they are all finished or the queue is not # empty anymore while !pending_task && available_workers.size != workers.size wait_for_worker_to_end(state) pending_task = state.pop end break if !pending_task && available_workers.size == workers.size end bypass_task = pending_task.disabled? || pending_task.already_invoked? || !pending_task.needed? if bypass_task pending_task.already_invoked = true state.process_finished_task(pending_task) next elsif state.trivial_task?(pending_task) Worker.execute_task(pending_task) state.process_finished_task(pending_task) next end # Get a job server token job_server.get wait_for_worker_to_end(state) until finished_workers.empty? # We do have a job server token, so we are allowed to allocate a # new worker if none are available if available_workers.empty? w = Worker.new(job_server, finished_workers) available_workers << w workers << w end worker = available_workers.pop state.mark_as_active(pending_task) worker.queue(pending_task) end not_processed = tasks.find_all { |t| !t.already_invoked? } unless not_processed.empty? cycle = resolve_cycle(tasks, not_processed, reverse_dependencies) raise "cycle in task graph: #{cycle.map(&:name).sort.join(', ')}" end end |
#resolve_cycle(all_tasks, tasks, reverse_dependencies) ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/autobuild/parallel.rb', line 226 def resolve_cycle(all_tasks, tasks, reverse_dependencies) cycle = tasks.dup chain = [] next_task = tasks.first loop do task = next_task chain << task tasks.delete(next_task) next_task = task.prerequisite_tasks.find do |dep_task| if chain.include?(dep_task) reject = chain.take_while { |t| t != dep_task } return chain[reject.size..-1] elsif tasks.include?(dep_task) true end end unless next_task Autobuild.fatal "parallel processing stopped prematurely, "\ "but no cycle is present in the remaining tasks" Autobuild.fatal "remaining tasks: #{cycle.map(&:name).join(', ')}" Autobuild.fatal "known dependencies at initialization time that "\ "could block the processing of the remaining tasks" reverse_dependencies.each do |parent_task, parents| if cycle.include?(parent_task) parents.each do |p| Autobuild.fatal " #{p}: #{parent_task}" end end end Autobuild.fatal "known dependencies right now that could block "\ "the processing of the remaining tasks" all_tasks.each do |p| (cycle & p.prerequisite_tasks).each do |t| Autobuild.fatal " #{p}: #{t}" end end raise "failed to resolve cycle in #{cycle.map(&:name).join(', ')}" end end chain end |
#wait_for_worker_to_end(state) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/autobuild/parallel.rb', line 36 def wait_for_worker_to_end(state) w = finished_workers.pop finished_task, error = w.last_result available_workers << w if error if available_workers.size != workers.size if finished_task.respond_to?(:package) && finished_task.package Autobuild.error "got an error processing "\ "#{finished_task.package.name}, "\ "waiting for pending jobs to end" else Autobuild.error "got an error doing parallel processing, "\ "waiting for pending jobs to end" end end begin finish_pending_work ensure raise error end end state.process_finished_task(finished_task) end |