Class: Reacto::Subscriptions::BufferedSubscription
- Inherits:
-
SimpleSubscription
- Object
- SimpleSubscription
- Reacto::Subscriptions::BufferedSubscription
- Includes:
- Subscription
- Defined in:
- lib/reacto/subscriptions/buffered_subscription.rb
Instance Attribute Summary collapse
-
#buffer ⇒ Object
Returns the value of attribute buffer.
-
#last_error ⇒ Object
Returns the value of attribute last_error.
Attributes inherited from SimpleSubscription
Instance Method Summary collapse
- #active? ⇒ Boolean
- #closed? ⇒ Boolean
-
#initialize(parent) ⇒ BufferedSubscription
constructor
A new instance of BufferedSubscription.
Methods included from Subscription
#add, #add_resource, #subscribed?, #unsubscribe
Methods inherited from SimpleSubscription
#add, #add_resource, #on_close, #on_error, #on_open, #on_value, #subscribed?, #unsubscribe
Constructor Details
#initialize(parent) ⇒ BufferedSubscription
Returns a new instance of BufferedSubscription.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/reacto/subscriptions/buffered_subscription.rb', line 10 def initialize(parent) @parent = parent @closed = false @active = false @buffer = Hash.new(NO_VALUE) @current_index = Concurrent::AtomicFixnum.new(0) @last_error = nil open = -> () do @active = true @parent.on_open end value = -> (v) do @buffer[@current_index.value] = v @current_index.increment @parent.on_value(v) end error = -> (e) do @last_error = e @parent.on_error(e) end close = -> () do @closed = true @parent.on_close end super(open: open, value: value, error: error, close: close) end |
Instance Attribute Details
#buffer ⇒ Object
Returns the value of attribute buffer.
8 9 10 |
# File 'lib/reacto/subscriptions/buffered_subscription.rb', line 8 def buffer @buffer end |
#last_error ⇒ Object
Returns the value of attribute last_error.
8 9 10 |
# File 'lib/reacto/subscriptions/buffered_subscription.rb', line 8 def last_error @last_error end |
Instance Method Details
#active? ⇒ Boolean
44 45 46 |
# File 'lib/reacto/subscriptions/buffered_subscription.rb', line 44 def active? @active end |
#closed? ⇒ Boolean
48 49 50 |
# File 'lib/reacto/subscriptions/buffered_subscription.rb', line 48 def closed? @closed end |