Class: Thread::Pipe
- Inherits:
-
Object
- Object
- Thread::Pipe
- Defined in:
- lib/thread/pipe.rb
Overview
A pipe lets you execute various tasks on a set of data in parallel, each datum inserted in the pipe is passed along through queues to the various functions composing the pipe, the final result is inserted in the final queue.
Defined Under Namespace
Classes: Task
Class Method Summary collapse
Instance Method Summary collapse
-
#deq(non_block = false) ⇒ Object
(also: #pop, #~)
Get an element from the output queue.
-
#empty? ⇒ Boolean
Check if the pipe is empty.
-
#enq(data) ⇒ Object
(also: #push, #<<)
Insert data in the pipe.
-
#initialize(input = Queue.new, output = Queue.new) ⇒ Pipe
constructor
Create a pipe using the optionally passed objects as input and output queue.
-
#|(func) ⇒ Object
Add a task to the pipe, it must respond to #call and #arity, and #arity must return 1.
Constructor Details
#initialize(input = Queue.new, output = Queue.new) ⇒ Pipe
Create a pipe using the optionally passed objects as input and output queue.
The objects must respond to #enq and #deq, and block on #deq.
57 58 59 60 61 62 63 64 |
# File 'lib/thread/pipe.rb', line 57 def initialize (input = Queue.new, output = Queue.new) @tasks = [] @input = input @output = output ObjectSpace.define_finalizer self, self.class.finalizer(@tasks) end |
Class Method Details
.finalizer(tasks) ⇒ Object
67 68 69 70 71 |
# File 'lib/thread/pipe.rb', line 67 def self.finalizer (tasks) proc { tasks.each(&:kill) } end |
Instance Method Details
#deq(non_block = false) ⇒ Object Also known as: pop, ~
Get an element from the output queue.
106 107 108 |
# File 'lib/thread/pipe.rb', line 106 def deq (non_block = false) @output.deq(non_block) end |
#empty? ⇒ Boolean
Check if the pipe is empty.
89 90 91 |
# File 'lib/thread/pipe.rb', line 89 def empty? @input.empty? && @output.empty? && @tasks.all?(&:empty?) end |
#enq(data) ⇒ Object Also known as: push, <<
Insert data in the pipe.
94 95 96 97 98 99 100 |
# File 'lib/thread/pipe.rb', line 94 def enq (data) return if @tasks.empty? @input.enq data self end |
#|(func) ⇒ Object
Add a task to the pipe, it must respond to #call and #arity, and #arity must return 1.
75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/thread/pipe.rb', line 75 def | (func) if func.arity != 1 raise ArgumentError, 'wrong arity' end Task.new(func, (@tasks.empty? ? @input : Queue.new), @output).tap {|t| @tasks.last.output = t.input unless @tasks.empty? @tasks << t } self end |