Class: Reacto::Subscriptions::CompositeSubscription

Inherits:
Object
  • Object
show all
Includes:
Subscription
Defined in:
lib/reacto/subscriptions/composite_subscription.rb

Instance Method Summary collapse

Constructor Details

#initialize(combinator, subscriber) ⇒ CompositeSubscription

Returns a new instance of CompositeSubscription.



10
11
12
13
14
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 10

def initialize(combinator, subscriber)
  @combinator = combinator
  @subscriptions = []
  @subscriber = subscriber
end

Instance Method Details

#add(_) ⇒ Object



25
26
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 25

def add(_)
end

#add_resource(_) ⇒ Object



28
29
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 28

def add_resource(_)
end

#after_on_value(_) ⇒ Object



56
57
58
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 56

def after_on_value(_)
  #nothing by default
end

#closed?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 67

def closed?
  @subscriptions.all? { |s| s.closed? }
end

#on_closeObject



71
72
73
74
75
76
77
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 71

def on_close
  return unless subscribed?
  return if @subscriptions.any? { |s| !s.closed? }

  @subscriber.on_close
  unsubscribe
end

#on_error(e) ⇒ Object



60
61
62
63
64
65
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 60

def on_error(e)
  # Introduce a multi-error and not call on_error right away when there is
  # an error and an option is set?
  return unless subscribed?
  @subscriber.on_error(e)
end

#on_openObject



31
32
33
34
35
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 31

def on_open
  return unless subscribed?
  return unless @subscriptions.any? { |s| !s.active? }
  @subscriber.on_open
end

#on_value(val) ⇒ Object



41
42
43
44
45
46
47
48
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 41

def on_value(val)
  return unless subscribed?
  return if waiting?


  on_value_subscriptions(val)
  after_on_value(val)
end

#on_value_subscriptions(_) ⇒ Object



50
51
52
53
54
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 50

def on_value_subscriptions(_)
  @subscriber.on_value(
    @combinator.call(*@subscriptions.map(&:last_value))
  )
end

#subscribed?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 16

def subscribed?
  @subscriptions.any? { |s| s.subscribed? }
end

#subscription!Object



79
80
81
82
83
84
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 79

def subscription!
  subscription = InnerSubscription.new(self)
  @subscriptions << subscription

  subscription
end

#unsubscribeObject



20
21
22
23
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 20

def unsubscribe
  @subscriptions.each(&:unsubscribe)
  @subscriptions = []
end

#waiting?Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/reacto/subscriptions/composite_subscription.rb', line 37

def waiting?
  @subscriptions.map(&:last_value).any? { |v| v == NO_VALUE }
end