Class: Autobuild::RakeTaskParallelism

Inherits:
Object
  • Object
show all
Defined in:
lib/autobuild/parallel.rb

Overview

This is a rewrite of the Rake task invocation code to use parallelism

Since autobuild does not use task arguments, we don’t support them for simplicity

Defined Under Namespace

Classes: JobServer, ProcessingState, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(level = Autobuild.parallel_build_level) ⇒ RakeTaskParallelism

Returns a new instance of RakeTaskParallelism.



29
30
31
32
33
34
# File 'lib/autobuild/parallel.rb', line 29

def initialize(level = Autobuild.parallel_build_level)
    @job_server = JobServer.new(level)
    @available_workers = Array.new
    @finished_workers = Queue.new
    @workers = Array.new
end

Instance Attribute Details

#available_workersObject (readonly)

Returns the value of attribute available_workers.



7
8
9
# File 'lib/autobuild/parallel.rb', line 7

def available_workers
  @available_workers
end

#finished_workersObject (readonly)

Returns the value of attribute finished_workers.



7
8
9
# File 'lib/autobuild/parallel.rb', line 7

def finished_workers
  @finished_workers
end

#job_serverObject (readonly)

Returns the value of attribute job_server.



7
8
9
# File 'lib/autobuild/parallel.rb', line 7

def job_server
  @job_server
end

#workersObject (readonly)

Returns the value of attribute workers.



7
8
9
# File 'lib/autobuild/parallel.rb', line 7

def workers
  @workers
end

Instance Method Details

#discover_dependencies(all_tasks, reverse_dependencies, task) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/autobuild/parallel.rb', line 61

def discover_dependencies(all_tasks, reverse_dependencies, task)
    return if task.already_invoked?
    return if all_tasks.include?(task) # already discovered or being discovered

    all_tasks << task

    task.prerequisite_tasks.each do |dep_t|
        reverse_dependencies[dep_t] << task
        discover_dependencies(all_tasks, reverse_dependencies, dep_t)
    end
end

#finish_pending_workObject



310
311
312
313
314
315
# File 'lib/autobuild/parallel.rb', line 310

def finish_pending_work
    while available_workers.size != workers.size
        w = finished_workers.pop
        available_workers << w
    end
end

#invoke_parallel(required_tasks, completion_callback: proc {}) ⇒ Object

Invokes the provided tasks. Unlike the rake code, this is a toplevel algorithm that does not use recursion



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/autobuild/parallel.rb', line 151

def invoke_parallel(required_tasks, completion_callback: proc {})
    tasks = Set.new
    reverse_dependencies = Hash.new { |h, k| h[k] = Set.new }
    required_tasks.each do |t|
        discover_dependencies(tasks, reverse_dependencies, t)
    end
    # The queue is the set of tasks for which all prerequisites have
    # been successfully executed (or where not needed). I.e. it is the
    # set of tasks that can be queued for execution.
    state = ProcessingState.new(reverse_dependencies,
                                completion_callback: completion_callback)
    tasks.each do |t|
        state.push(t) if state.ready?(t)
    end

    # Build a reverse dependency graph (i.e. a mapping from a task to
    # the tasks that depend on it)

    # This is kind-of a topological sort. However, we don't do the full
    # topological sort since we would then have to scan all tasks each
    # time for tasks that have no currently running prerequisites

    loop do
        pending_task = state.pop
        unless pending_task
            # If we have pending workers, wait for one to be finished
            # until either they are all finished or the queue is not
            # empty anymore
            while !pending_task && available_workers.size != workers.size
                wait_for_worker_to_end(state)
                pending_task = state.pop
            end

            break if !pending_task && available_workers.size == workers.size
        end

        bypass_task = pending_task.disabled? ||
                      pending_task.already_invoked? ||
                      !pending_task.needed?

        if bypass_task
            pending_task.already_invoked = true
            state.process_finished_task(pending_task)
            next
        elsif state.trivial_task?(pending_task)
            Worker.execute_task(pending_task)
            state.process_finished_task(pending_task)
            next
        end

        # Get a job server token
        job_server.get

        wait_for_worker_to_end(state) until finished_workers.empty?

        # We do have a job server token, so we are allowed to allocate a
        # new worker if none are available
        if available_workers.empty?
            w = Worker.new(job_server, finished_workers)
            available_workers << w
            workers << w
        end

        worker = available_workers.pop
        state.mark_as_active(pending_task)
        worker.queue(pending_task)
    end

    not_processed = tasks.find_all { |t| !t.already_invoked? }
    unless not_processed.empty?
        cycle = resolve_cycle(tasks, not_processed, reverse_dependencies)
        raise "cycle in task graph: #{cycle.map(&:name).sort.join(', ')}"
    end
end

#resolve_cycle(all_tasks, tasks, reverse_dependencies) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/autobuild/parallel.rb', line 226

def resolve_cycle(all_tasks, tasks, reverse_dependencies)
    cycle = tasks.dup
    chain = []
    next_task = tasks.first
    loop do
        task = next_task
        chain << task
        tasks.delete(next_task)
        next_task = task.prerequisite_tasks.find do |dep_task|
            if chain.include?(dep_task)
                reject = chain.take_while { |t| t != dep_task }
                return chain[reject.size..-1]
            elsif tasks.include?(dep_task)
                true
            end
        end
        unless next_task
            Autobuild.fatal "parallel processing stopped prematurely, "\
                "but no cycle is present in the remaining tasks"
            Autobuild.fatal "remaining tasks: #{cycle.map(&:name).join(', ')}"
            Autobuild.fatal "known dependencies at initialization time that "\
                "could block the processing of the remaining tasks"
            reverse_dependencies.each do |parent_task, parents|
                if cycle.include?(parent_task)
                    parents.each do |p|
                        Autobuild.fatal "  #{p}: #{parent_task}"
                    end
                end
            end
            Autobuild.fatal "known dependencies right now that could block "\
                "the processing of the remaining tasks"
            all_tasks.each do |p|
                (cycle & p.prerequisite_tasks).each do |t|
                    Autobuild.fatal "  #{p}: #{t}"
                end
            end
            raise "failed to resolve cycle in #{cycle.map(&:name).join(', ')}"
        end
    end
    chain
end

#wait_for_worker_to_end(state) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/autobuild/parallel.rb', line 36

def wait_for_worker_to_end(state)
    w = finished_workers.pop
    finished_task, error = w.last_result
    available_workers << w
    if error
        if available_workers.size != workers.size
            if finished_task.respond_to?(:package) && finished_task.package
                Autobuild.error "got an error processing "\
                    "#{finished_task.package.name}, "\
                    "waiting for pending jobs to end"
            else
                Autobuild.error "got an error doing parallel processing, "\
                    "waiting for pending jobs to end"
            end
        end
        begin
            finish_pending_work
        ensure
            raise error
        end
    end

    state.process_finished_task(finished_task)
end