Class: ConcurrentPipeline::Producer::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_pipeline/producer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStream

Returns a new instance of Stream.



14
15
16
17
18
# File 'lib/concurrent_pipeline/producer.rb', line 14

def initialize
  @receivers = {
    default: -> (type, *) { Log.warn("No stream handler for type: #{type.inspect}") },
  }
end

Instance Attribute Details

#receiversObject (readonly)

Returns the value of attribute receivers.



13
14
15
# File 'lib/concurrent_pipeline/producer.rb', line 13

def receivers
  @receivers
end

Instance Method Details

#on(type, &block) ⇒ Object



20
21
22
# File 'lib/concurrent_pipeline/producer.rb', line 20

def on(type, &block)
  receivers[type] = block
end

#push(type, payload) ⇒ Object



24
25
26
27
28
# File 'lib/concurrent_pipeline/producer.rb', line 24

def push(type, payload)
  receivers[type]
    .tap { Log.warn("No stream handler for type: #{type.inspect}") if _1.nil? }
    &.call(payload)
end