Class: Reacto::Trackable

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/reacto/trackable.rb

Direct Known Subclasses

LabeledTrackable, SharedTrackable

Constant Summary collapse

TOPICS =
%i(open value error close)
EXECUTOR_ALIASES =
{
  new_thread: Executors.new_thread,
  background: Executors.tasks,
  tasks: Executors.tasks,
  io: Executors.io,
  current: Executors.current,
  immediate: Executors.immediate,
  now: Executors.immediate
}

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(executor = nil, &block) ⇒ Trackable

Returns a new instance of Trackable.



180
181
182
183
184
185
186
# File 'lib/reacto/trackable.rb', line 180

def initialize(executor = nil, &block)
  @behaviour = block_given? ? block : NO_ACTION

  stored = EXECUTOR_ALIASES[executor]
  executor = stored if stored
  @executor = executor
end

Class Method Details

.close(executor: nil) ⇒ Object



53
54
55
# File 'lib/reacto/trackable.rb', line 53

def close(executor: nil)
  make(executor) { |subscriber| subscriber.on_close }
end

.combine(*trackables, &block) ⇒ Object



33
34
35
36
37
# File 'lib/reacto/trackable.rb', line 33

def combine(*trackables, &block)
  combine_create(
    Subscriptions::CombiningSubscription, *trackables, &block
  )
end

.combine_last(*trackables, &block) ⇒ Object



43
44
45
46
47
# File 'lib/reacto/trackable.rb', line 43

def combine_last(*trackables, &block)
  combine_create(
    Subscriptions::CombiningLastSubscription, *trackables, &block
  )
end

.combine_latestObject



166
167
168
169
170
# File 'lib/reacto/trackable.rb', line 166

def combine(*trackables, &block)
  combine_create(
    Subscriptions::CombiningSubscription, *trackables, &block
  )
end

.concat(*trackables) ⇒ Object



39
40
41
# File 'lib/reacto/trackable.rb', line 39

def concat(*trackables)
  trackables.inject { |current, trackable| current.concat(trackable) }
end

.enumerable(enumerable, executor: nil) ⇒ Object



162
163
164
# File 'lib/reacto/trackable.rb', line 162

def enumerable(enumerable, executor: nil)
  make(executor, &Behaviours.enumerable(enumerable))
end

.error(err, executor: nil) ⇒ Object



57
58
59
60
61
# File 'lib/reacto/trackable.rb', line 57

def error(err, executor: nil)
  make(executor) do |subscriber|
    subscriber.on_error(err)
  end
end

.interval(interval, enumerator = Behaviours.integers_enumerator, executor: nil) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/reacto/trackable.rb', line 88

def interval(
  interval,
  enumerator = Behaviours.integers_enumerator,
  executor: nil
)
  stored = EXECUTOR_ALIASES[executor]
  executor = stored if stored

  if executor.is_a?(Concurrent::ImmediateExecutor)
    make do |tracker|
      Behaviours.with_close_and_error(tracker) do |subscriber|
        while subscriber.subscribed?
          sleep interval if subscriber.subscribed?
          if subscriber.subscribed?
            begin
              subscriber.on_value(enumerator.next)
            rescue StopIteration
              break
            end
          else
            break
          end
        end
      end
    end
  else
    make do |tracker|
      Thread.abort_on_exception = true

      queue = Queue.new
      task = Concurrent::TimerTask.new(execution_interval: interval) do
        queue.push('ready')
      end

      thread = Thread.new do
        begin
          loop do
            queue.pop
            break unless tracker.subscribed?

            begin
              value = enumerator.next
              tracker.on_value(value)
            rescue StopIteration
              tracker.on_close if tracker.subscribed?
              break
            rescue StandardError => error
              tracker.on_error(error) if tracker.subscribed?
              break
            end
          end
        ensure
          task.shutdown
        end
      end
      task.execute

      tracker.add_resource(Reacto::Resources::ExecutorResource.new(
        task, threads: [thread]
      ))
    end
  end
end

.later(secs, value, executor: Reacto::Executors.tasks) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/reacto/trackable.rb', line 70

def later(secs, value, executor: Reacto::Executors.tasks)
  stored = EXECUTOR_ALIASES[executor]
  executor = stored if stored

  if executor.is_a?(Concurrent::ImmediateExecutor)
    make do |tracker|
      sleep secs
      Behaviours.single_tracker_value(tracker, value)
    end
  else
    make do |tracker|
      Concurrent::ScheduledTask.execute(secs, executor: executor) do
        Behaviours.single_tracker_value(tracker, value)
      end
    end
  end
end

.make(executor_param = nil, executor: nil, &block) ⇒ Object



63
64
65
66
67
68
# File 'lib/reacto/trackable.rb', line 63

def make(executor_param = nil, executor: nil,  &block)
  real_executor = executor_param ? executor_param : executor

  behaviour = block_given? ? block : NO_ACTION
  self.new(real_executor, &behaviour)
end

.neverObject



29
30
31
# File 'lib/reacto/trackable.rb', line 29

def never
  self.new
end

.repeat(array, int: 0.1, executor: nil) ⇒ Object



152
153
154
155
156
# File 'lib/reacto/trackable.rb', line 152

def repeat(array, int: 0.1, executor: nil)
  interval(
    int, Behaviours.array_repeat_enumerator(array), executor: executor
  )
end

.value(value, executor: nil) ⇒ Object



158
159
160
# File 'lib/reacto/trackable.rb', line 158

def value(value, executor: nil)
  make(executor, &Behaviours.single_value(value))
end

.zip(*trackables, &block) ⇒ Object



49
50
51
# File 'lib/reacto/trackable.rb', line 49

def zip(*trackables, &block)
  combine_create(Subscriptions::ZippingSubscription, *trackables, &block)
end

Instance Method Details

#[](x) ⇒ Object



521
522
523
# File 'lib/reacto/trackable.rb', line 521

def [](x)
  lift(Operations::Drop.new(x, 1))
end

#act(on: Operations::Act::ALL, &block) ⇒ Object



579
580
581
# File 'lib/reacto/trackable.rb', line 579

def act(on: Operations::Act::ALL, &block)
  lift(Operations::Act.new(block, on))
end

#all?(&block) ⇒ Boolean

Returns:

  • (Boolean)


188
189
190
# File 'lib/reacto/trackable.rb', line 188

def all?(&block)
  lift(Operations::BlockingEnumerable.new(:'all?', block))
end

#any?(&block) ⇒ Boolean

Returns:

  • (Boolean)


192
193
194
# File 'lib/reacto/trackable.rb', line 192

def any?(&block)
  lift(Operations::BlockingEnumerable.new(:'any?', block))
end

#append(to_append, condition: nil) ⇒ Object



533
534
535
# File 'lib/reacto/trackable.rb', line 533

def append(to_append, condition: nil)
  lift(Operations::Append.new(to_append, condition: condition))
end

#await(subscription, timeout = nil) ⇒ Object



655
656
657
658
659
660
661
662
663
664
# File 'lib/reacto/trackable.rb', line 655

def await(subscription, timeout = nil)
  return unless subscription.subscribed?

  latch = Concurrent::CountDownLatch.new(1)
  subscription.add(Subscriptions.on_close_and_error { latch.count_down })

  latch.wait(timeout)
rescue Exception => e
  raise e unless e.message.include?('No live threads left')
end

#buffer(count: nil, delay: nil) ⇒ Object



545
546
547
# File 'lib/reacto/trackable.rb', line 545

def buffer(count: nil, delay: nil)
  lift(Operations::Buffer.new(count: count, delay: delay))
end

#chunk(executor: nil, &block) ⇒ Object



223
224
225
226
227
228
229
230
# File 'lib/reacto/trackable.rb', line 223

def chunk(executor: nil, &block)
  return self unless block_given?

  executor = retrieve_executor(executor)
  executor = @executor if executor.nil?

  lift(Operations::Chunk.new(block, executor: executor))
end

#chunk_while(executor: nil, &block) ⇒ Object



232
233
234
235
236
237
# File 'lib/reacto/trackable.rb', line 232

def chunk_while(executor: nil, &block)
  executor = retrieve_executor(executor)
  executor = @executor if executor.nil?

  lift(Operations::ChunkWhile.new(block, executor: executor))
end

#combine(*trackables, &block) ⇒ Object Also known as: combine_latest



609
610
611
612
613
# File 'lib/reacto/trackable.rb', line 609

def combine(*trackables, &block)
  return self unless block_given?

  self.class.combine(*([self] + trackables), &block)
end

#combine_last(*trackables, &block) ⇒ Object



603
604
605
606
607
# File 'lib/reacto/trackable.rb', line 603

def combine_last(*trackables, &block)
  return self unless block_given?

  self.class.combine_last(*([self] + trackables), &block)
end

#concat(trackable) ⇒ Object



537
538
539
# File 'lib/reacto/trackable.rb', line 537

def concat(trackable)
  lift(Operations::Concat.new(trackable))
end

#count(value = NO_VALUE, &block) ⇒ Object



263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/reacto/trackable.rb', line 263

def count(value = NO_VALUE, &block)
  source =
    if value != NO_VALUE
      select(&Behaviours.same_predicate(value))
    elsif block_given?
      select(&block)
    else
      self
    end

  source.map(1).inject(&:+).last
end

#cycle(n = nil) ⇒ Object



239
240
241
# File 'lib/reacto/trackable.rb', line 239

def cycle(n = nil)
  lift(Operations::Cycle.new(@behaviour, n))
end

#delay(delay) ⇒ Object



549
550
551
# File 'lib/reacto/trackable.rb', line 549

def delay(delay)
  buffer(delay: delay)
end

#delay_each(delay) ⇒ Object



553
554
555
# File 'lib/reacto/trackable.rb', line 553

def delay_each(delay)
  lift(Operations::DelayEach.new(delay))
end

#depend_on(trackable, key: :data, &block) ⇒ Object



561
562
563
564
565
# File 'lib/reacto/trackable.rb', line 561

def depend_on(trackable, key: :data, &block)
  lift(Operations::DependOn.new(
    trackable, key: key, accumulator: block
  ))
end

#diff(initial = NO_VALUE, &block) ⇒ Object



478
479
480
481
482
# File 'lib/reacto/trackable.rb', line 478

def diff(initial = NO_VALUE, &block)
  lift(Operations::Diff.new(
    block_given? ? block : Operations::Diff::DEFAULT_FN, initial
  ))
end

#do_track(subscription) ⇒ Object



679
680
681
682
683
684
685
# File 'lib/reacto/trackable.rb', line 679

def do_track(subscription)
  if @executor
    @executor.post(subscription, &@behaviour)
  else
    @behaviour.call(subscription)
  end
end

#drop(how_many_to_drop) ⇒ Object Also known as: skip



484
485
486
# File 'lib/reacto/trackable.rb', line 484

def drop(how_many_to_drop)
  lift(Operations::Drop.new(how_many_to_drop))
end

#drop_errorsObject Also known as: skip_errors



493
494
495
# File 'lib/reacto/trackable.rb', line 493

def drop_errors
  lift(Operations::DropErrors.new)
end

#drop_while(&block) ⇒ Object



488
489
490
491
# File 'lib/reacto/trackable.rb', line 488

def drop_while(&block)
  predicate = block_given? ? block : FALSE_PREDICATE
  lift(Operations::DropWhile.new(predicate))
end

#each_cons(n, &block) ⇒ Object

Raises:

  • (ArgumentError)


276
277
278
279
280
281
282
283
284
285
286
# File 'lib/reacto/trackable.rb', line 276

def each_cons(n, &block)
  raise ArgumentError.new('invalid size') if n <= 0
  return each(&block) if n == 1

  reset_action = -> (current) { current[1..-1] }

  trackable = lift(Operations::EachCollect.new(
    n, reset_action: reset_action, on_error: NO_ACTION, on_close: NO_ACTION
  ))
  block_given? ? trackable.on(&block) : trackable
end

#each_slice(n, &block) ⇒ Object

Raises:

  • (ArgumentError)


288
289
290
291
292
293
294
# File 'lib/reacto/trackable.rb', line 288

def each_slice(n, &block)
  raise ArgumentError.new('invalid size') if n <= 0

  trackable = lift(Operations::EachCollect.new(n))

  block_given? ? trackable.on(&block) : trackable
end

#each_with_index(&block) ⇒ Object



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/reacto/trackable.rb', line 296

def each_with_index(&block)
  index = 0

  collect_action = -> (val, collection) do
    collection << val
    collection << index
    index += 1
  end

  trackable = lift(Operations::EachCollect.new(
    2, collect_action: collect_action, init_action: -> () { index = 0 },
    on_error: NO_ACTION, on_close: NO_ACTION
  ))

  block_given? ? trackable.on(&block) : trackable
end

#each_with_object(obj, &block) ⇒ Object



474
475
476
# File 'lib/reacto/trackable.rb', line 474

def each_with_object(obj, &block)
  lift(Operations::EachWithObject.new(block, obj))
end

#entries(n = nil) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/reacto/trackable.rb', line 313

def entries(n = nil)
  return [] if n && n.is_a?(Integer) && n <= 0

  trackable = self
  trackable = trackable.take(n) if n && n.is_a?(Integer) && n > 0

  result = []
  subscription = trackable.on(value: ->(v) { result << v })

  trackable.await(subscription)

  result
end

#execute_on(executor) ⇒ Object



648
649
650
651
652
653
# File 'lib/reacto/trackable.rb', line 648

def execute_on(executor)
  stored = EXECUTOR_ALIASES[executor]
  executor = stored if stored

  self.class.new(executor, &@behaviour)
end

#find(if_none = NO_VALUE, &block) ⇒ Object Also known as: detect



243
244
245
246
247
248
249
250
251
# File 'lib/reacto/trackable.rb', line 243

def find(if_none = NO_VALUE, &block)
  trackable = select(&block).first

  if if_none != NO_VALUE
    trackable = trackable.append(if_none, condition: :source_empty)
  end

  trackable
end

#find_index(value = NO_VALUE, &block) ⇒ Object



253
254
255
256
257
258
259
260
261
# File 'lib/reacto/trackable.rb', line 253

def find_index(value = NO_VALUE, &block)
  predicate =
    if value != NO_VALUE
      -> (v) { value == v }
    else
      block
    end
  lift(Operations::FindIndex.new(predicate))
end

#first(n = 1) ⇒ Object

Raises:

  • (ArgumentError)


515
516
517
518
519
# File 'lib/reacto/trackable.rb', line 515

def first(n = 1)
  raise ArgumentError.new('negative array size') if n < 0

  take(n)
end

#flat_map(transform = nil, label: nil, &block) ⇒ Object Also known as: collect_concat



373
374
375
376
377
378
379
380
381
# File 'lib/reacto/trackable.rb', line 373

def flat_map(transform = nil, label: nil, &block)
  if label
    lift(Operations::OperationOnLabeled.new(
      label, block_given? ? block : transform, op: :flat_map
    ))
  else
    lift(Operations::FlatMap.new(block_given? ? block : transform))
  end
end

#flat_map_latest(transform = nil, &block) ⇒ Object



383
384
385
# File 'lib/reacto/trackable.rb', line 383

def flat_map_latest(transform = nil, &block)
  lift(Operations::FlatMapLatest.new(block_given? ? block : transform))
end

#flattenObject



511
512
513
# File 'lib/reacto/trackable.rb', line 511

def flatten
  lift(Operations::Flatten.new)
end

#flatten_labeled(initial: NO_VALUE, &block) ⇒ Object



571
572
573
# File 'lib/reacto/trackable.rb', line 571

def flatten_labeled(initial: NO_VALUE, &block)
  lift(Operations::FlattenLabeled.new(block, initial))
end

#grep(pattern, &block) ⇒ Object



427
428
429
# File 'lib/reacto/trackable.rb', line 427

def grep(pattern, &block)
  select_map(-> (v) { pattern === v }, &block)
end

#grep_v(pattern, &block) ⇒ Object



431
432
433
# File 'lib/reacto/trackable.rb', line 431

def grep_v(pattern, &block)
  select_map(-> (v) { !(pattern === v) }, &block)
end

#group_by_label(executor: nil, &block) ⇒ Object Also known as: group_by



567
568
569
# File 'lib/reacto/trackable.rb', line 567

def group_by_label(executor: nil, &block)
  lift(Operations::GroupByLabel.new(block, executor))
end

#include?(obj) ⇒ Boolean Also known as: member?

Returns:

  • (Boolean)


435
436
437
# File 'lib/reacto/trackable.rb', line 435

def include?(obj)
  lift(Operations::Include.new(obj))
end

#inject(initial = NO_VALUE, label: nil, initial_value: NO_VALUE, &block) ⇒ Object Also known as: reduce



461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/reacto/trackable.rb', line 461

def inject(initial = NO_VALUE, label: nil, initial_value: NO_VALUE, &block)
  return self unless block_given?

  init = initial != NO_VALUE ? initial : initial_value
  if label
    lift(Operations::OperationOnLabeled.new(
      label, block, op: :inject, initial_value: init
    ))
  else
    lift(Operations::Inject.new(block, init))
  end
end

#lastObject



525
526
527
# File 'lib/reacto/trackable.rb', line 525

def last
  lift(Operations::Last.new)
end

#lazyObject



439
440
441
# File 'lib/reacto/trackable.rb', line 439

def lazy
  self # Just to comply with Enumerable
end

#lift(operation = nil, &block) ⇒ Object



361
362
363
364
365
366
367
368
369
370
371
# File 'lib/reacto/trackable.rb', line 361

def lift(operation = nil, &block)
  operation = block_given? ? block : operation
  create_lifted do |tracker_subscription|
    begin
      modified = operation.call(tracker_subscription)
      lift_behaviour(modified) unless modified == NOTHING
    rescue Exception => e
      tracker_subscription.on_error(e)
    end
  end
end

#map(val = NO_VALUE, error: nil, close: nil, label: nil, &block) ⇒ Object Also known as: collect



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/reacto/trackable.rb', line 387

def map(val = NO_VALUE, error: nil, close: nil, label: nil, &block)
  action =
    if block_given?
      block
    else
      val == NO_VALUE ? IDENTITY_ACTION : Behaviours.constant(val)
    end
  if label
    lift(Operations::OperationOnLabeled.new(
      label, action, error: error, close: close
    ))
  else
    lift(Operations::Map.new(action, error: error, close: close))
  end
end

#max(&block) ⇒ Object



403
404
405
# File 'lib/reacto/trackable.rb', line 403

def max(&block)
  lift(Operations::Extremums.new(action: block))
end

#max_by(&block) ⇒ Object



407
408
409
# File 'lib/reacto/trackable.rb', line 407

def max_by(&block)
  lift(Operations::Extremums.new(by: block))
end

#merge(*trackables, delay_error: false) ⇒ Object



541
542
543
# File 'lib/reacto/trackable.rb', line 541

def merge(*trackables, delay_error: false)
  lift(Operations::Merge.new(trackables, delay_error: delay_error))
end

#min(&block) ⇒ Object



411
412
413
# File 'lib/reacto/trackable.rb', line 411

def min(&block)
  lift(Operations::Extremums.new(action: block, type: :min))
end

#min_by(&block) ⇒ Object



415
416
417
# File 'lib/reacto/trackable.rb', line 415

def min_by(&block)
  lift(Operations::Extremums.new(by: block, type: :min))
end

#minmax(&block) ⇒ Object



419
420
421
# File 'lib/reacto/trackable.rb', line 419

def minmax(&block)
  lift(Operations::Extremums.new(action: block, type: :minmax))
end

#minmax_by(&block) ⇒ Object



423
424
425
# File 'lib/reacto/trackable.rb', line 423

def minmax_by(&block)
  lift(Operations::Extremums.new(by: block, type: :minmax))
end

#none?(&block) ⇒ Boolean

Returns:

  • (Boolean)


196
197
198
# File 'lib/reacto/trackable.rb', line 196

def none?(&block)
  lift(Operations::BlockingEnumerable.new(:'none?', block))
end

#off(notification_tracker = nil) ⇒ Object



346
347
348
# File 'lib/reacto/trackable.rb', line 346

def off(notification_tracker = nil)
  # Clean-up logic
end

#on(trackers = {}, &block) ⇒ Object Also known as: each, each_entry



335
336
337
338
339
340
341
342
343
344
# File 'lib/reacto/trackable.rb', line 335

def on(trackers = {}, &block)
  trackers[:value] = block if block_given?

  unless (trackers.keys - TOPICS).empty?
    raise "This Trackable supports only #{TOPICS}, " \
      "but #{trackers.keys} were passed."
  end

  track(Tracker.new(trackers))
end

#one?(&block) ⇒ Boolean

Returns:

  • (Boolean)


200
201
202
# File 'lib/reacto/trackable.rb', line 200

def one?(&block)
  lift(Operations::BlockingEnumerable.new(:'one?', block))
end

#partition(executor: nil, &block) ⇒ Object



214
215
216
217
218
219
220
221
# File 'lib/reacto/trackable.rb', line 214

def partition(executor: nil, &block)
  return self unless block_given?

  executor = retrieve_executor(executor)
  executor = @executor if executor.nil?

  lift(Operations::Partition.new(block, executor: executor))
end

#prepend(enumerable) ⇒ Object



529
530
531
# File 'lib/reacto/trackable.rb', line 529

def prepend(enumerable)
  lift(Operations::Prepend.new(enumerable))
end

#reject(&block) ⇒ Object



457
458
459
# File 'lib/reacto/trackable.rb', line 457

def reject(&block)
  select(&->(val) { !block.call(val)} )
end

#rescue_and_replace_error(&block) ⇒ Object



593
594
595
596
597
# File 'lib/reacto/trackable.rb', line 593

def rescue_and_replace_error(&block)
  return self unless block_given?

  lift(Operations::RescueAndReplaceError.new(block))
end

#rescue_and_replace_error_with(trackable) ⇒ Object



599
600
601
# File 'lib/reacto/trackable.rb', line 599

def rescue_and_replace_error_with(trackable)
  rescue_and_replace_error { |_error| trackable }
end

#retry(retries = 1) ⇒ Object



583
584
585
# File 'lib/reacto/trackable.rb', line 583

def retry(retries = 1)
  lift(Operations::Retry.new(@behaviour, retries))
end

#retry_when(&block) ⇒ Object



587
588
589
590
591
# File 'lib/reacto/trackable.rb', line 587

def retry_when(&block)
  return self unless block_given?

  lift(Operations::RetryWhen.new(@behaviour, block))
end

#select(label: nil, &block) ⇒ Object Also known as: find_all



447
448
449
450
451
452
453
454
455
# File 'lib/reacto/trackable.rb', line 447

def select(label: nil, &block)
  return self unless block_given?

  if label
    lift(Operations::OperationOnLabeled.new(label, block, op: :select))
  else
    lift(Operations::Select.new(block))
  end
end

#slice(pattern = NO_ACTION, type:, &block) ⇒ Object



623
624
625
626
627
628
629
630
631
# File 'lib/reacto/trackable.rb', line 623

def slice(pattern = NO_ACTION, type:, &block)
  predicate =
    if pattern != NO_VALUE
      -> (val) { pattern === val }
    else
      block
    end
  lift(Operations::Slice.new(predicate, type: type))
end

#slice_after(pattern = NO_VALUE, &block) ⇒ Object



615
616
617
# File 'lib/reacto/trackable.rb', line 615

def slice_after(pattern = NO_VALUE, &block)
  slice(pattern, type: :after, &block)
end

#slice_before(pattern = NO_VALUE, &block) ⇒ Object



619
620
621
# File 'lib/reacto/trackable.rb', line 619

def slice_before(pattern = NO_VALUE, &block)
  slice(pattern, type: :before, &block)
end

#slice_when(&block) ⇒ Object



633
634
635
# File 'lib/reacto/trackable.rb', line 633

def slice_when(&block)
  lift(Operations::SliceWhen.new(block))
end

#sort(&block) ⇒ Object



204
205
206
# File 'lib/reacto/trackable.rb', line 204

def sort(&block)
  lift(Operations::BlockingEnumerable.new(:sort, block))
end

#sort_by(&block) ⇒ Object



208
209
210
211
212
# File 'lib/reacto/trackable.rb', line 208

def sort_by(&block)
  return self unless block_given?

  lift(Operations::BlockingEnumerable.new(:sort_by, block))
end

#split_labeled(label, executor: nil, &block) ⇒ Object



575
576
577
# File 'lib/reacto/trackable.rb', line 575

def split_labeled(label, executor: nil, &block)
  lift(Operations::SplitLabeled.new(label, block, executor))
end

#take(how_many_to_take) ⇒ Object



497
498
499
# File 'lib/reacto/trackable.rb', line 497

def take(how_many_to_take)
  lift(Operations::Take.new(how_many_to_take))
end

#take_while(&block) ⇒ Object



501
502
503
504
505
# File 'lib/reacto/trackable.rb', line 501

def take_while(&block)
  return self unless block_given?

  lift(Operations::TakeWhile.new(block))
end

#throttle(delay) ⇒ Object



557
558
559
# File 'lib/reacto/trackable.rb', line 557

def throttle(delay)
  lift(Operations::Throttle.new(delay))
end

#to_aObject



327
328
329
# File 'lib/reacto/trackable.rb', line 327

def to_a
  entries
end

#to_hObject



331
332
333
# File 'lib/reacto/trackable.rb', line 331

def to_h
  to_a.to_h
end

#track(notification_tracker, &block) ⇒ Object



350
351
352
353
354
355
356
357
358
359
# File 'lib/reacto/trackable.rb', line 350

def track(notification_tracker, &block)
  return on(&block) if block_given?

  subscription =
    Subscriptions::TrackerSubscription.new(notification_tracker, self)

  do_track(subscription)

  Subscriptions::SubscriptionWrapper.new(subscription)
end

#track_on(executor) ⇒ Object



641
642
643
644
645
646
# File 'lib/reacto/trackable.rb', line 641

def track_on(executor)
  stored = EXECUTOR_ALIASES[executor]
  executor = stored if stored

  lift(Operations::TrackOn.new(executor))
end

#uniqObject



507
508
509
# File 'lib/reacto/trackable.rb', line 507

def uniq
  lift(Operations::Uniq.new)
end

#wrap(**args) ⇒ Object



443
444
445
# File 'lib/reacto/trackable.rb', line 443

def wrap(**args)
  lift(Operations::Wrap.new(args))
end

#zip(*trackables, &block) ⇒ Object



637
638
639
# File 'lib/reacto/trackable.rb', line 637

def zip(*trackables, &block)
  self.class.zip(*([self] + trackables), &block)
end