Class: Reacto::Subscriptions::ExecutorSubscription

Inherits:
Object
  • Object
show all
Includes:
Subscription
Defined in:
lib/reacto/subscriptions/executor_subscription.rb

Instance Method Summary collapse

Constructor Details

#initialize(subscription, executor) ⇒ ExecutorSubscription

Returns a new instance of ExecutorSubscription.



9
10
11
12
13
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 9

def initialize(subscription, executor)
  @executor = executor
  @wrapped = subscription
  @closed = false
end

Instance Method Details

#add(subscription) ⇒ Object



27
28
29
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 27

def add(subscription)
  @wrapped.add(subscription)
end

#add_resource(resource) ⇒ Object



31
32
33
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 31

def add_resource(resource)
  @wrapped.add_resource(resource)
end

#on_closeObject



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 45

def on_close
  return if !subscribed? || @closed

  unsubscribe_subscription = Subscriptions.on_close_and_error do
    @executor.post(&method(:unsubscribe))
    @closed = true
  end
  @wrapped.add(unsubscribe_subscription)

  @executor.post(&@wrapped.method(:on_close))
end

#on_error(error) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 57

def on_error(error)
  return if !subscribed? || @closed

  unsubscribe_subscription = Subscriptions.on_close_and_error do
    @executor.post(&method(:unsubscribe))
    @closed = true
  end
  @wrapped.add(unsubscribe_subscription)

  @executor.post(error, &@wrapped.method(:on_error))
end

#on_openObject



35
36
37
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 35

def on_open
  @executor.post(&@wrapped.method(:on_open))
end

#on_value(v) ⇒ Object



39
40
41
42
43
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 39

def on_value(v)
  return if !subscribed? || @closed

  @executor.post(v, &@wrapped.method(:on_value))
end

#subscribed?Boolean

Returns:

  • (Boolean)


15
16
17
18
19
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 15

def subscribed?
  unsubscribe unless @wrapped.subscribed?

  @executor.running?
end

#unsubscribeObject



21
22
23
24
25
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 21

def unsubscribe
  @wrapped.unsubscribe

  @executor.post(&@executor.method(:shutdown))
end