Class: Reacto::Trackable
- Inherits:
-
Object
- Object
- Reacto::Trackable
- Defined in:
- lib/reacto/trackable.rb
Constant Summary collapse
- TOPICS =
[:open, :value, :error, :close]
Class Method Summary collapse
- .close(executor = nil) ⇒ Object
- .combine(*trackables, &block) ⇒ Object
- .combine_last(*trackables, &block) ⇒ Object
- .combine_with(function, *trackables) ⇒ Object
- .enumerable(enumerable, executor = nil) ⇒ Object
- .error(err, executor = nil) ⇒ Object
- .interval(interval, enumerator = Behaviours.integers_enumerator, executor: nil) ⇒ Object
- .later(secs, value, executor: Reacto::Executors.tasks) ⇒ Object
- .make(behaviour = NO_ACTION, executor = nil, &block) ⇒ Object
- .never ⇒ Object
- .repeat(array, int: 0.1, executor: nil) ⇒ Object
- .value(value, executor = nil) ⇒ Object
- .zip(*trackables, &block) ⇒ Object
Instance Method Summary collapse
- #[](x) ⇒ Object
- #await(subscription, timeout = nil) ⇒ Object
- #buffer(count: nil, delay: nil) ⇒ Object
- #cache(type: :memory, **settings) ⇒ Object
- #concat(trackable) ⇒ Object
- #delay(delay) ⇒ Object
- #diff(initial = NO_VALUE, fn = Operations::Diff::DEFAULT_FN, &block) ⇒ Object
- #do_track(subscription) ⇒ Object
- #drop(how_many_to_drop) ⇒ Object (also: #skip)
- #drop_errors ⇒ Object (also: #skip_errors)
- #execute_on(executor) ⇒ Object
- #first ⇒ Object
- #flat_map(transform = nil, &block) ⇒ Object
- #flat_map_latest(transform = nil, &block) ⇒ Object
- #flatten ⇒ Object
-
#initialize(behaviour = NO_ACTION, executor = nil, &block) ⇒ Trackable
constructor
A new instance of Trackable.
- #inject(initial = NO_VALUE, injector = nil, &block) ⇒ Object
- #last ⇒ Object
- #lift(operation = nil, &block) ⇒ Object
- #map(mapping = nil, error: nil, close: nil, &block) ⇒ Object
- #merge(trackable, delay_error: false) ⇒ Object
- #off(notification_tracker) ⇒ Object
- #on(trackers = {}) ⇒ Object
- #prepend(enumerable) ⇒ Object
- #select(filter = nil, &block) ⇒ Object
- #take(how_many_to_take) ⇒ Object
- #throttle(delay) ⇒ Object
- #track(notification_tracker) ⇒ Object
- #track_on(executor) ⇒ Object
- #uniq ⇒ Object
Constructor Details
#initialize(behaviour = NO_ACTION, executor = nil, &block) ⇒ Trackable
Returns a new instance of Trackable.
168 169 170 171 |
# File 'lib/reacto/trackable.rb', line 168 def initialize(behaviour = NO_ACTION, executor = nil, &block) @behaviour = block_given? ? block : behaviour @executor = executor end |
Class Method Details
.close(executor = nil) ⇒ Object
36 37 38 39 40 |
# File 'lib/reacto/trackable.rb', line 36 def close(executor = nil) make(nil, executor) do |subscriber| subscriber.on_close end end |
.combine(*trackables, &block) ⇒ Object
20 21 22 23 24 |
# File 'lib/reacto/trackable.rb', line 20 def combine(*trackables, &block) combine_create( Subscriptions::CombiningSubscription, *trackables, &block ) end |
.combine_last(*trackables, &block) ⇒ Object
26 27 28 29 30 |
# File 'lib/reacto/trackable.rb', line 26 def combine_last(*trackables, &block) combine_create( Subscriptions::CombiningLastSubscription, *trackables, &block ) end |
.combine_with(function, *trackables) ⇒ Object
153 154 |
# File 'lib/reacto/trackable.rb', line 153 def combine_with(function, *trackables) end |
.enumerable(enumerable, executor = nil) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/reacto/trackable.rb', line 138 def enumerable(enumerable, executor = nil) make(nil, executor) do |tracker| begin enumerable.each do |val| break unless tracker.subscribed? tracker.on_value(val) end tracker.on_close if tracker.subscribed? rescue => error tracker.on_error(error) if tracker.subscribed? end end end |
.error(err, executor = nil) ⇒ Object
42 43 44 45 46 |
# File 'lib/reacto/trackable.rb', line 42 def error(err, executor = nil) make(nil, executor) do |subscriber| subscriber.on_error(err) end end |
.interval(interval, enumerator = Behaviours.integers_enumerator, executor: nil) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/reacto/trackable.rb', line 68 def interval( interval, enumerator = Behaviours.integers_enumerator, executor: nil ) if executor.is_a?(Concurrent::ImmediateExecutor) make do |tracker| Behaviours.with_close_and_error(tracker) do |subscriber| while subscriber.subscribed? sleep interval if subscriber.subscribed? if subscriber.subscribed? begin subscriber.on_value(enumerator.next) rescue StopIteration break end else break end end end end else make do |tracker| queue = Queue.new task = Concurrent::TimerTask.new(execution_interval: interval) do queue.push('ready') end Thread::abort_on_exception = true thread = Thread.new do begin loop do queue.pop break unless tracker.subscribed? begin value = enumerator.next tracker.on_value(value) rescue StopIteration tracker.on_close if tracker.subscribed? break rescue StandardError => error tracker.on_error(error) if tracker.subscribed? break end end ensure task.shutdown end end task.execute tracker.add_resource(Reacto::Resources::ExecutorResource.new( task, threads: [thread] )) end end end |
.later(secs, value, executor: Reacto::Executors.tasks) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/reacto/trackable.rb', line 53 def later(secs, value, executor: Reacto::Executors.tasks) if executor.is_a?(Concurrent::ImmediateExecutor) make do |tracker| sleep secs Behaviours.single_tracker_value(tracker, value) end else make do |tracker| Concurrent::ScheduledTask.execute(secs, executor: executor) do Behaviours.single_tracker_value(tracker, value) end end end end |
.make(behaviour = NO_ACTION, executor = nil, &block) ⇒ Object
48 49 50 51 |
# File 'lib/reacto/trackable.rb', line 48 def make(behaviour = NO_ACTION, executor = nil, &block) behaviour = block_given? ? block : behaviour self.new(behaviour, executor) end |
.never ⇒ Object
16 17 18 |
# File 'lib/reacto/trackable.rb', line 16 def never self.new end |
.repeat(array, int: 0.1, executor: nil) ⇒ Object
128 129 130 131 132 |
# File 'lib/reacto/trackable.rb', line 128 def repeat(array, int: 0.1, executor: nil) interval( int, Behaviours.array_repeat_enumerator(array), executor: executor ) end |
.value(value, executor = nil) ⇒ Object
134 135 136 |
# File 'lib/reacto/trackable.rb', line 134 def value(value, executor = nil) make(Behaviours.single_value(value), executor) end |
.zip(*trackables, &block) ⇒ Object
32 33 34 |
# File 'lib/reacto/trackable.rb', line 32 def zip(*trackables, &block) combine_create(Subscriptions::ZippingSubscription, *trackables, &block) end |
Instance Method Details
#[](x) ⇒ Object
257 258 259 |
# File 'lib/reacto/trackable.rb', line 257 def [](x) lift(Operations::Drop.new(x, 1)) end |
#await(subscription, timeout = nil) ⇒ Object
302 303 304 305 306 |
# File 'lib/reacto/trackable.rb', line 302 def await(subscription, timeout = nil) latch = Concurrent::CountDownLatch.new(1) subscription.add(Subscriptions.on_close_and_error { latch.count_down }) latch.wait(timeout) end |
#buffer(count: nil, delay: nil) ⇒ Object
277 278 279 |
# File 'lib/reacto/trackable.rb', line 277 def buffer(count: nil, delay: nil) lift(Operations::Buffer.new(count: count, delay: delay)) end |
#cache(type: :memory, **settings) ⇒ Object
289 290 291 292 |
# File 'lib/reacto/trackable.rb', line 289 def cache(type: :memory, **settings) settings ||= {} lift(Operations::Cache.new(type: type, **settings)) end |
#concat(trackable) ⇒ Object
269 270 271 |
# File 'lib/reacto/trackable.rb', line 269 def concat(trackable) lift(Operations::Concat.new(trackable)) end |
#delay(delay) ⇒ Object
281 282 283 |
# File 'lib/reacto/trackable.rb', line 281 def delay(delay) buffer(delay: delay) end |
#diff(initial = NO_VALUE, fn = Operations::Diff::DEFAULT_FN, &block) ⇒ Object
229 230 231 |
# File 'lib/reacto/trackable.rb', line 229 def diff(initial = NO_VALUE, fn = Operations::Diff::DEFAULT_FN, &block) lift(Operations::Diff.new(block_given? ? block : fn, initial)) end |
#do_track(subscription) ⇒ Object
311 312 313 314 315 316 317 |
# File 'lib/reacto/trackable.rb', line 311 def do_track(subscription) if @executor @executor.post(subscription, &@behaviour) else @behaviour.call(subscription) end end |
#drop(how_many_to_drop) ⇒ Object Also known as: skip
233 234 235 |
# File 'lib/reacto/trackable.rb', line 233 def drop(how_many_to_drop) lift(Operations::Drop.new(how_many_to_drop)) end |
#drop_errors ⇒ Object Also known as: skip_errors
237 238 239 |
# File 'lib/reacto/trackable.rb', line 237 def drop_errors lift(Operations::DropErrors.new) end |
#execute_on(executor) ⇒ Object
298 299 300 |
# File 'lib/reacto/trackable.rb', line 298 def execute_on(executor) Trackable.new(@behaviour, executor) end |
#first ⇒ Object
253 254 255 |
# File 'lib/reacto/trackable.rb', line 253 def first take(1) end |
#flat_map(transform = nil, &block) ⇒ Object
207 208 209 |
# File 'lib/reacto/trackable.rb', line 207 def flat_map(transform = nil, &block) lift(Operations::FlatMap.new(block_given? ? block : transform)) end |
#flat_map_latest(transform = nil, &block) ⇒ Object
211 212 213 |
# File 'lib/reacto/trackable.rb', line 211 def flat_map_latest(transform = nil, &block) lift(Operations::FlatMapLatest.new(block_given? ? block : transform)) end |
#flatten ⇒ Object
249 250 251 |
# File 'lib/reacto/trackable.rb', line 249 def flatten lift(Operations::Flatten.new) end |
#inject(initial = NO_VALUE, injector = nil, &block) ⇒ Object
225 226 227 |
# File 'lib/reacto/trackable.rb', line 225 def inject(initial = NO_VALUE, injector = nil, &block) lift(Operations::Inject.new(block_given? ? block : injector, initial)) end |
#last ⇒ Object
261 262 263 |
# File 'lib/reacto/trackable.rb', line 261 def last lift(Operations::Last.new) end |
#lift(operation = nil, &block) ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/reacto/trackable.rb', line 195 def lift(operation = nil, &block) operation = block_given? ? block : operation Trackable.new(nil, @executor) do |tracker_subscription| begin modified = operation.call(tracker_subscription) lift_behaviour(modified) unless modified == NOTHING rescue Exception => e tracker_subscription.on_error(e) end end end |
#map(mapping = nil, error: nil, close: nil, &block) ⇒ Object
215 216 217 218 219 |
# File 'lib/reacto/trackable.rb', line 215 def map(mapping = nil, error: nil, close: nil, &block) lift(Operations::Map.new( block_given? ? block : mapping, error: error, close: close )) end |
#merge(trackable, delay_error: false) ⇒ Object
273 274 275 |
# File 'lib/reacto/trackable.rb', line 273 def merge(trackable, delay_error: false) lift(Operations::Merge.new(trackable, delay_error: delay_error)) end |
#off(notification_tracker) ⇒ Object
182 183 184 |
# File 'lib/reacto/trackable.rb', line 182 def off(notification_tracker) # Clean-up logic end |
#on(trackers = {}) ⇒ Object
173 174 175 176 177 178 179 180 |
# File 'lib/reacto/trackable.rb', line 173 def on(trackers = {}) unless (trackers.keys - TOPICS).empty? raise "This Trackable supports only #{TOPICS}, " \ "but #{trackers.keys} were passed." end track(Tracker.new(trackers)) end |
#prepend(enumerable) ⇒ Object
265 266 267 |
# File 'lib/reacto/trackable.rb', line 265 def prepend(enumerable) lift(Operations::Prepend.new(enumerable)) end |
#select(filter = nil, &block) ⇒ Object
221 222 223 |
# File 'lib/reacto/trackable.rb', line 221 def select(filter = nil, &block) lift(Operations::Select.new(block_given? ? block : filter)) end |
#take(how_many_to_take) ⇒ Object
241 242 243 |
# File 'lib/reacto/trackable.rb', line 241 def take(how_many_to_take) lift(Operations::Take.new(how_many_to_take)) end |
#throttle(delay) ⇒ Object
285 286 287 |
# File 'lib/reacto/trackable.rb', line 285 def throttle(delay) lift(Operations::Throttle.new(delay)) end |
#track(notification_tracker) ⇒ Object
186 187 188 189 190 191 192 193 |
# File 'lib/reacto/trackable.rb', line 186 def track(notification_tracker) subscription = Subscriptions::TrackerSubscription.new(notification_tracker, self) do_track(subscription) Subscriptions::SubscriptionWrapper.new(subscription) end |
#track_on(executor) ⇒ Object
294 295 296 |
# File 'lib/reacto/trackable.rb', line 294 def track_on(executor) lift(Operations::TrackOn.new(executor)) end |
#uniq ⇒ Object
245 246 247 |
# File 'lib/reacto/trackable.rb', line 245 def uniq lift(Operations::Uniq.new) end |