Class: Reacto::Subscriptions::ZippingSubscription
Instance Method Summary
collapse
#add, #add_resource, #after_on_value, #closed?, #initialize, #on_error, #on_open, #on_value, #unsubscribe
#add, #add_resource, #unsubscribe
Instance Method Details
#current_value ⇒ Object
8
9
10
|
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 8
def current_value
@current_value ||= 0
end
|
#on_close ⇒ Object
29
30
31
32
|
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 29
def on_close
return unless subscribed?
@subscriber.on_close
end
|
#on_value_subscriptions(_) ⇒ Object
20
21
22
23
24
25
26
27
|
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 20
def on_value_subscriptions(_)
@subscriber.on_value(
@combinator.call(
*@subscriptions.map { |sub| sub.buffer[current_value] }
)
)
@current_value += 1
end
|
#subscribed? ⇒ Boolean
12
13
14
|
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 12
def subscribed?
@subscriptions.all? { |s| s.subscribed? }
end
|
#subscription! ⇒ Object
34
35
36
37
38
39
|
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 34
def subscription!
subscription = BufferedSubscription.new(self)
@subscriptions << subscription
subscription
end
|
#waiting? ⇒ Boolean
16
17
18
|
# File 'lib/reacto/subscriptions/zipping_subscription.rb', line 16
def waiting?
@subscriptions.any? { |sub| sub.buffer[current_value] == NO_VALUE }
end
|