Class: Reacto::Subscriptions::CompositeSubscription
- Inherits:
-
Object
- Object
- Reacto::Subscriptions::CompositeSubscription
show all
- Includes:
- Subscription
- Defined in:
- lib/reacto/subscriptions/composite_subscription.rb
Instance Method Summary
collapse
Constructor Details
Returns a new instance of CompositeSubscription.
10
11
12
13
14
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 10
def initialize(combinator, subscriber)
@combinator = combinator
@subscriptions = []
@subscriber = subscriber
end
|
Instance Method Details
#add(_) ⇒ Object
25
26
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 25
def add(_)
end
|
#add_resource(_) ⇒ Object
28
29
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 28
def add_resource(_)
end
|
#after_on_value(_) ⇒ Object
56
57
58
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 56
def after_on_value(_)
end
|
#closed? ⇒ Boolean
67
68
69
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 67
def closed?
@subscriptions.all? { |s| s.closed? }
end
|
#on_close ⇒ Object
71
72
73
74
75
76
77
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 71
def on_close
return unless subscribed?
return if @subscriptions.any? { |s| !s.closed? }
@subscriber.on_close
unsubscribe
end
|
#on_error(e) ⇒ Object
60
61
62
63
64
65
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 60
def on_error(e)
return unless subscribed?
@subscriber.on_error(e)
end
|
#on_open ⇒ Object
31
32
33
34
35
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 31
def on_open
return unless subscribed?
return unless @subscriptions.any? { |s| !s.active? }
@subscriber.on_open
end
|
#on_value(val) ⇒ Object
41
42
43
44
45
46
47
48
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 41
def on_value(val)
return unless subscribed?
return if waiting?
on_value_subscriptions(val)
after_on_value(val)
end
|
#on_value_subscriptions(_) ⇒ Object
50
51
52
53
54
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 50
def on_value_subscriptions(_)
@subscriber.on_value(
@combinator.call(*@subscriptions.map(&:last_value))
)
end
|
#subscribed? ⇒ Boolean
16
17
18
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 16
def subscribed?
@subscriptions.any? { |s| s.subscribed? }
end
|
#subscription! ⇒ Object
79
80
81
82
83
84
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 79
def subscription!
subscription = InnerSubscription.new(self)
@subscriptions << subscription
subscription
end
|
#unsubscribe ⇒ Object
20
21
22
23
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 20
def unsubscribe
@subscriptions.each(&:unsubscribe)
@subscriptions = []
end
|
#waiting? ⇒ Boolean
37
38
39
|
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 37
def waiting?
@subscriptions.map(&:last_value).any? { |v| v == NO_VALUE }
end
|