Class: Reacto::Subscriptions::ExecutorSubscription
- Inherits:
-
Object
- Object
- Reacto::Subscriptions::ExecutorSubscription
- Includes:
- Subscription
- Defined in:
- lib/reacto/subscriptions/executor_subscription.rb
Instance Method Summary collapse
- #add(subscription) ⇒ Object
- #add_resource(resource) ⇒ Object
-
#initialize(subscription, executor) ⇒ ExecutorSubscription
constructor
A new instance of ExecutorSubscription.
- #on_close ⇒ Object
- #on_error(error) ⇒ Object
- #on_open ⇒ Object
- #on_value(v) ⇒ Object
- #subscribed? ⇒ Boolean
- #unsubscribe ⇒ Object
Constructor Details
#initialize(subscription, executor) ⇒ ExecutorSubscription
Returns a new instance of ExecutorSubscription.
9 10 11 12 13 |
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 9 def initialize(subscription, executor) @executor = executor @wrapped = subscription @closed = false end |
Instance Method Details
#add(subscription) ⇒ Object
27 28 29 |
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 27 def add(subscription) @wrapped.add(subscription) end |
#add_resource(resource) ⇒ Object
31 32 33 |
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 31 def add_resource(resource) @wrapped.add_resource(resource) end |
#on_close ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 45 def on_close return if !subscribed? || @closed unsubscribe_subscription = Subscriptions.on_close_and_error do @executor.post(&method(:unsubscribe)) @closed = true end @wrapped.add(unsubscribe_subscription) @executor.post(&@wrapped.method(:on_close)) end |
#on_error(error) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 57 def on_error(error) return if !subscribed? || @closed unsubscribe_subscription = Subscriptions.on_close_and_error do @executor.post(&method(:unsubscribe)) @closed = true end @wrapped.add(unsubscribe_subscription) @executor.post(error, &@wrapped.method(:on_error)) end |
#on_open ⇒ Object
35 36 37 |
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 35 def on_open @executor.post(&@wrapped.method(:on_open)) end |
#on_value(v) ⇒ Object
39 40 41 42 43 |
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 39 def on_value(v) return if !subscribed? || @closed @executor.post(v, &@wrapped.method(:on_value)) end |
#subscribed? ⇒ Boolean
15 16 17 18 19 |
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 15 def subscribed? unsubscribe unless @wrapped.subscribed? @executor.running? end |
#unsubscribe ⇒ Object
21 22 23 24 25 |
# File 'lib/reacto/subscriptions/executor_subscription.rb', line 21 def unsubscribe @wrapped.unsubscribe @executor.post(&@executor.method(:shutdown)) end |