Class: Reacto::Subscriptions::ZippingSubscription

Inherits:
CompositeSubscription show all
Defined in:
lib/reacto/subscriptions/zipping_subscription.rb

Instance Method Summary collapse

Methods inherited from CompositeSubscription

#add, #add_resource, #after_on_value, #closed?, #initialize, #on_error, #on_open, #on_value, #unsubscribe

Methods included from Subscription

#add, #add_resource, #unsubscribe

Constructor Details

This class inherits a constructor from Reacto::Subscriptions::CompositeSubscription

Instance Method Details

#current_valueObject



8
9
10
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 8

def current_value
  @current_value ||= 0
end

#on_closeObject



29
30
31
32
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 29

def on_close
  return unless subscribed?
  @subscriber.on_close
end

#on_value_subscriptions(_) ⇒ Object



20
21
22
23
24
25
26
27
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 20

def on_value_subscriptions(_)
  @subscriber.on_value(
    @combinator.call(
      *@subscriptions.map { |sub| sub.buffer[current_value] }
    )
  )
  @current_value += 1
end

#subscribed?Boolean

Returns:

  • (Boolean)


12
13
14
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 12

def subscribed?
  @subscriptions.all? { |s| s.subscribed? }
end

#subscription!Object



34
35
36
37
38
39
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 34

def subscription!
  subscription = BufferedSubscription.new(self)
  @subscriptions << subscription

  subscription
end

#waiting?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 16

def waiting?
  @subscriptions.any? { |sub| sub.buffer[current_value] == NO_VALUE }
end