Class: ParallelQueue
- Inherits:
-
Queue
- Object
- Queue
- ParallelQueue
- Includes:
- Parallel::ProcessorCount
- Defined in:
- lib/parallelQueue.rb
Overview
ParallelQueue is nothing but a regular queue with the ability to store blocks or methods (plus aruments)
Instance Method Summary collapse
-
#justRun(workers = processor_count) ⇒ Object
run the given calls WITHOUT automatic result storage, but faster.
-
#push(*item, &block) ⇒ Object
puts code to the queue as a * method: push(method,arg1,arg2,…) * block: push { … }.
- #queue_push ⇒ Object
-
#run(workers = processor_count) ⇒ Object
run things with the ‘parallel’ library - results are returned automatically.
Instance Method Details
#justRun(workers = processor_count) ⇒ Object
run the given calls WITHOUT automatic result storage, but faster
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/parallelQueue.rb', line 46 def justRun(workers=processor_count) @threads = (1..workers).map {|i| Thread.new(self) {|q| until ( q == ( task = q.deq ) ) if task.size > 1 if task[0].kind_of? Proc # Expects proc/lambda with arguments, e.g. [mysqrt,2.789] task[0].call(*task[1..-1]) else # expect an object in task[0] and one of its methods with # arguments in task[1] as a symbol # e.g. [a,[:attribute=,1] task[0].send(task[1],*task[2..-1]) end else task[0].call end end } } @threads.size.times { self.enq self} @threads.each {|t| t.join} end |
#push(*item, &block) ⇒ Object
puts code to the queue as a
-
method: push(method,arg1,arg2,…)
-
block: push { … }
17 18 19 20 |
# File 'lib/parallelQueue.rb', line 17 def push (*item, &block) queue_push(item ) unless item.empty? queue_push([block]) unless block.nil? end |
#queue_push ⇒ Object
11 |
# File 'lib/parallelQueue.rb', line 11 alias :queue_push :push |
#run(workers = processor_count) ⇒ Object
run things with the ‘parallel’ library - results are returned automatically
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/parallelQueue.rb', line 23 def run(workers=processor_count) queue_push(Parallel::Stop) Parallel.map(self,:in_threads => workers) {|task| if task.size > 1 if task[0].kind_of? Proc # Expects proc/lambda with arguments,e.g. # [mysqrt,2.789] # [myproc,x,y,z] task[0].call(*task[1..-1]) else # expect an object in task[0] and one of its methods with arguments # in task[1] as a symbol # e.g. [a,[:attribute=,1] or # Math,:exp,0 task[0].send(task[1],*task[2..-1]) end else task[0].call end } end |