Class: ProducerConsumer
- Inherits:
-
Object
- Object
- ProducerConsumer
- Defined in:
- lib/producer_consumer.rb,
lib/producer_consumer/version.rb
Defined Under Namespace
Classes: Finished, NoConsumer, NoProducer, Producer
Constant Summary collapse
- VERSION =
'0.1'
Class Method Summary collapse
Instance Method Summary collapse
-
#consume(n = 1, &block) ⇒ Object
Registers n consumers.
- #consumer(block) ⇒ Object
-
#initialize ⇒ ProducerConsumer
constructor
Oh god.
-
#produce(n = 1, &block) ⇒ Object
Registers n producers.
- #producer(block) ⇒ Object
-
#run ⇒ Object
Run!.
Constructor Details
#initialize ⇒ ProducerConsumer
Oh god. It’s a PATTERN.
28 29 30 31 32 33 |
# File 'lib/producer_consumer.rb', line 28 def initialize @queue = Queue.new @state = :free @producers = [] @consumers = [] end |
Class Method Details
.consume(*a, &b) ⇒ Object
19 20 21 |
# File 'lib/producer_consumer.rb', line 19 def self.consume(*a, &b) new.consume(*a, &b) end |
.produce(*a, &b) ⇒ Object
23 24 25 |
# File 'lib/producer_consumer.rb', line 23 def self.produce(*a, &b) new.produce(*a, &b) end |
Instance Method Details
#consume(n = 1, &block) ⇒ Object
Registers n consumers
57 58 59 60 61 62 63 |
# File 'lib/producer_consumer.rb', line 57 def consume(n = 1, &block) @consumers += (0...n).map do |c| block end self end |
#consumer(block) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/producer_consumer.rb', line 65 def consumer(block) Thread.new do begin loop do x = @queue.pop if x == Finished raise Finished end block.call x end rescue Finished end end end |
#produce(n = 1, &block) ⇒ Object
Registers n producers
36 37 38 39 40 41 42 |
# File 'lib/producer_consumer.rb', line 36 def produce(n = 1, &block) @producers += (0...n).map do |i| block end self end |
#producer(block) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/producer_consumer.rb', line 44 def producer(block) Thread.new do p = Producer.new begin loop do @queue << block.call(p) end rescue Finished end end end |
#run ⇒ Object
Run!
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/producer_consumer.rb', line 83 def run # Can't run this more than once. unless @state == :free raise RuntimeError, "Already running" end # Validate we will actually do something. raise NoConsumer if @consumers.empty? raise NoProducer if @producers.empty? # Start producers and consumers @state = :starting consumers = @consumers.map do |block| consumer block end producers = @producers.map do |block| producer block end # Wait for producers to finish @state = :waiting_for_producers producers.each { |t| t.join } # Signal consumers to finish... @state = :waiting_for_consumers consumers.size.times do @queue << Finished end consumers.each { |t| t.join } @state = :free end |