Class: Reacto::Trackable
- Inherits:
-
Object
show all
- Includes:
- Enumerable
- Defined in:
- lib/reacto/trackable.rb
Constant Summary
collapse
- TOPICS =
%i(open value error close)
- EXECUTOR_ALIASES =
{
new_thread: Executors.new_thread,
background: Executors.tasks,
tasks: Executors.tasks,
io: Executors.io,
current: Executors.current,
immediate: Executors.immediate,
now: Executors.immediate
}
Class Method Summary
collapse
-
.close(executor: nil) ⇒ Object
-
.combine(*trackables, &block) ⇒ Object
-
.combine_last(*trackables, &block) ⇒ Object
-
.combine_latest ⇒ Object
-
.concat(*trackables) ⇒ Object
-
.enumerable(enumerable, executor: nil) ⇒ Object
-
.error(err, executor: nil) ⇒ Object
-
.interval(interval, enumerator = Behaviours.integers_enumerator, executor: nil) ⇒ Object
-
.later(secs, value, executor: Reacto::Executors.tasks) ⇒ Object
-
.make(executor_param = nil, executor: nil, &block) ⇒ Object
-
.never ⇒ Object
-
.repeat(array, int: 0.1, executor: nil) ⇒ Object
-
.value(value, executor: nil) ⇒ Object
-
.zip(*trackables, &block) ⇒ Object
Instance Method Summary
collapse
-
#[](x) ⇒ Object
-
#act(on: Operations::Act::ALL, &block) ⇒ Object
-
#all?(&block) ⇒ Boolean
-
#any?(&block) ⇒ Boolean
-
#append(to_append, condition: nil) ⇒ Object
-
#await(subscription, timeout = nil) ⇒ Object
-
#buffer(count: nil, delay: nil) ⇒ Object
-
#chunk(executor: nil, &block) ⇒ Object
-
#chunk_while(executor: nil, &block) ⇒ Object
-
#combine(*trackables, &block) ⇒ Object
(also: #combine_latest)
-
#combine_last(*trackables, &block) ⇒ Object
-
#concat(trackable) ⇒ Object
-
#count(value = NO_VALUE, &block) ⇒ Object
-
#cycle(n = nil) ⇒ Object
-
#delay(delay) ⇒ Object
-
#delay_each(delay) ⇒ Object
-
#depend_on(trackable, key: :data, &block) ⇒ Object
-
#diff(initial = NO_VALUE, &block) ⇒ Object
-
#do_track(subscription) ⇒ Object
-
#drop(how_many_to_drop) ⇒ Object
(also: #skip)
-
#drop_errors ⇒ Object
(also: #skip_errors)
-
#drop_while(&block) ⇒ Object
-
#each_cons(n, &block) ⇒ Object
-
#each_slice(n, &block) ⇒ Object
-
#each_with_index(&block) ⇒ Object
-
#each_with_object(obj, &block) ⇒ Object
-
#entries(n = nil) ⇒ Object
-
#execute_on(executor) ⇒ Object
-
#find(if_none = NO_VALUE, &block) ⇒ Object
(also: #detect)
-
#find_index(value = NO_VALUE, &block) ⇒ Object
-
#first(n = 1) ⇒ Object
-
#flat_map(transform = nil, label: nil, &block) ⇒ Object
(also: #collect_concat)
-
#flat_map_latest(transform = nil, &block) ⇒ Object
-
#flatten ⇒ Object
-
#flatten_labeled(initial: NO_VALUE, &block) ⇒ Object
-
#grep(pattern, &block) ⇒ Object
-
#grep_v(pattern, &block) ⇒ Object
-
#group_by_label(executor: nil, &block) ⇒ Object
(also: #group_by)
-
#include?(obj) ⇒ Boolean
(also: #member?)
-
#initialize(executor = nil, &block) ⇒ Trackable
constructor
A new instance of Trackable.
-
#inject(initial = NO_VALUE, label: nil, initial_value: NO_VALUE, &block) ⇒ Object
(also: #reduce)
-
#last ⇒ Object
-
#lazy ⇒ Object
-
#lift(operation = nil, &block) ⇒ Object
-
#map(val = NO_VALUE, error: nil, close: nil, label: nil, &block) ⇒ Object
(also: #collect)
-
#max(&block) ⇒ Object
-
#max_by(&block) ⇒ Object
-
#merge(*trackables, delay_error: false) ⇒ Object
-
#min(&block) ⇒ Object
-
#min_by(&block) ⇒ Object
-
#minmax(&block) ⇒ Object
-
#minmax_by(&block) ⇒ Object
-
#none?(&block) ⇒ Boolean
-
#off(notification_tracker = nil) ⇒ Object
-
#on(trackers = {}, &block) ⇒ Object
(also: #each, #each_entry)
-
#one?(&block) ⇒ Boolean
-
#partition(executor: nil, &block) ⇒ Object
-
#prepend(enumerable) ⇒ Object
-
#reject(&block) ⇒ Object
-
#rescue_and_replace_error(&block) ⇒ Object
-
#rescue_and_replace_error_with(trackable) ⇒ Object
-
#retry(retries = 1) ⇒ Object
-
#retry_when(&block) ⇒ Object
-
#select(label: nil, &block) ⇒ Object
(also: #find_all)
-
#slice(pattern = NO_ACTION, type:, &block) ⇒ Object
-
#slice_after(pattern = NO_VALUE, &block) ⇒ Object
-
#slice_before(pattern = NO_VALUE, &block) ⇒ Object
-
#slice_when(&block) ⇒ Object
-
#sort(&block) ⇒ Object
-
#sort_by(&block) ⇒ Object
-
#split_labeled(label, executor: nil, &block) ⇒ Object
-
#take(how_many_to_take) ⇒ Object
-
#take_while(&block) ⇒ Object
-
#throttle(delay) ⇒ Object
-
#to_a ⇒ Object
-
#to_h ⇒ Object
-
#track(notification_tracker, &block) ⇒ Object
-
#track_on(executor) ⇒ Object
-
#uniq ⇒ Object
-
#wrap(**args) ⇒ Object
-
#zip(*trackables, &block) ⇒ Object
Constructor Details
#initialize(executor = nil, &block) ⇒ Trackable
Returns a new instance of Trackable.
180
181
182
183
184
185
186
|
# File 'lib/reacto/trackable.rb', line 180
def initialize(executor = nil, &block)
@behaviour = block_given? ? block : NO_ACTION
stored = EXECUTOR_ALIASES[executor]
executor = stored if stored
@executor = executor
end
|
Class Method Details
.close(executor: nil) ⇒ Object
53
54
55
|
# File 'lib/reacto/trackable.rb', line 53
def close(executor: nil)
make(executor) { |subscriber| subscriber.on_close }
end
|
.combine(*trackables, &block) ⇒ Object
.combine_last(*trackables, &block) ⇒ Object
.combine_latest ⇒ Object
166
167
168
169
170
|
# File 'lib/reacto/trackable.rb', line 166
def combine(*trackables, &block)
combine_create(
Subscriptions::CombiningSubscription, *trackables, &block
)
end
|
.concat(*trackables) ⇒ Object
39
40
41
|
# File 'lib/reacto/trackable.rb', line 39
def concat(*trackables)
trackables.inject { |current, trackable| current.concat(trackable) }
end
|
.enumerable(enumerable, executor: nil) ⇒ Object
162
163
164
|
# File 'lib/reacto/trackable.rb', line 162
def enumerable(enumerable, executor: nil)
make(executor, &Behaviours.enumerable(enumerable))
end
|
.error(err, executor: nil) ⇒ Object
57
58
59
60
61
|
# File 'lib/reacto/trackable.rb', line 57
def error(err, executor: nil)
make(executor) do |subscriber|
subscriber.on_error(err)
end
end
|
.interval(interval, enumerator = Behaviours.integers_enumerator, executor: nil) ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
# File 'lib/reacto/trackable.rb', line 88
def interval(
interval,
enumerator = Behaviours.integers_enumerator,
executor: nil
)
stored = EXECUTOR_ALIASES[executor]
executor = stored if stored
if executor.is_a?(Concurrent::ImmediateExecutor)
make do |tracker|
Behaviours.with_close_and_error(tracker) do |subscriber|
while subscriber.subscribed?
sleep interval if subscriber.subscribed?
if subscriber.subscribed?
begin
subscriber.on_value(enumerator.next)
rescue StopIteration
break
end
else
break
end
end
end
end
else
make do |tracker|
Thread.abort_on_exception = true
queue = Queue.new
task = Concurrent::TimerTask.new(execution_interval: interval) do
queue.push('ready')
end
thread = Thread.new do
begin
loop do
queue.pop
break unless tracker.subscribed?
begin
value = enumerator.next
tracker.on_value(value)
rescue StopIteration
tracker.on_close if tracker.subscribed?
break
rescue StandardError => error
tracker.on_error(error) if tracker.subscribed?
break
end
end
ensure
task.shutdown
end
end
task.execute
tracker.add_resource(Reacto::Resources::ExecutorResource.new(
task, threads: [thread]
))
end
end
end
|
.later(secs, value, executor: Reacto::Executors.tasks) ⇒ Object
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/reacto/trackable.rb', line 70
def later(secs, value, executor: Reacto::Executors.tasks)
stored = EXECUTOR_ALIASES[executor]
executor = stored if stored
if executor.is_a?(Concurrent::ImmediateExecutor)
make do |tracker|
sleep secs
Behaviours.single_tracker_value(tracker, value)
end
else
make do |tracker|
Concurrent::ScheduledTask.execute(secs, executor: executor) do
Behaviours.single_tracker_value(tracker, value)
end
end
end
end
|
.make(executor_param = nil, executor: nil, &block) ⇒ Object
63
64
65
66
67
68
|
# File 'lib/reacto/trackable.rb', line 63
def make(executor_param = nil, executor: nil, &block)
real_executor = executor_param ? executor_param : executor
behaviour = block_given? ? block : NO_ACTION
self.new(real_executor, &behaviour)
end
|
.never ⇒ Object
29
30
31
|
# File 'lib/reacto/trackable.rb', line 29
def never
self.new
end
|
.repeat(array, int: 0.1, executor: nil) ⇒ Object
152
153
154
155
156
|
# File 'lib/reacto/trackable.rb', line 152
def repeat(array, int: 0.1, executor: nil)
interval(
int, Behaviours.array_repeat_enumerator(array), executor: executor
)
end
|
.value(value, executor: nil) ⇒ Object
158
159
160
|
# File 'lib/reacto/trackable.rb', line 158
def value(value, executor: nil)
make(executor, &Behaviours.single_value(value))
end
|
.zip(*trackables, &block) ⇒ Object
Instance Method Details
#[](x) ⇒ Object
521
522
523
|
# File 'lib/reacto/trackable.rb', line 521
def [](x)
lift(Operations::Drop.new(x, 1))
end
|
#act(on: Operations::Act::ALL, &block) ⇒ Object
#append(to_append, condition: nil) ⇒ Object
533
534
535
|
# File 'lib/reacto/trackable.rb', line 533
def append(to_append, condition: nil)
lift(Operations::Append.new(to_append, condition: condition))
end
|
#await(subscription, timeout = nil) ⇒ Object
655
656
657
658
659
660
661
662
663
664
|
# File 'lib/reacto/trackable.rb', line 655
def await(subscription, timeout = nil)
return unless subscription.subscribed?
latch = Concurrent::CountDownLatch.new(1)
subscription.add(Subscriptions.on_close_and_error { latch.count_down })
latch.wait(timeout)
rescue Exception => e
raise e unless e.message.include?('No live threads left')
end
|
#buffer(count: nil, delay: nil) ⇒ Object
545
546
547
|
# File 'lib/reacto/trackable.rb', line 545
def buffer(count: nil, delay: nil)
lift(Operations::Buffer.new(count: count, delay: delay))
end
|
#chunk(executor: nil, &block) ⇒ Object
223
224
225
226
227
228
229
230
|
# File 'lib/reacto/trackable.rb', line 223
def chunk(executor: nil, &block)
return self unless block_given?
executor = retrieve_executor(executor)
executor = @executor if executor.nil?
lift(Operations::Chunk.new(block, executor: executor))
end
|
#chunk_while(executor: nil, &block) ⇒ Object
232
233
234
235
236
237
|
# File 'lib/reacto/trackable.rb', line 232
def chunk_while(executor: nil, &block)
executor = retrieve_executor(executor)
executor = @executor if executor.nil?
lift(Operations::ChunkWhile.new(block, executor: executor))
end
|
#combine(*trackables, &block) ⇒ Object
Also known as:
combine_latest
609
610
611
612
613
|
# File 'lib/reacto/trackable.rb', line 609
def combine(*trackables, &block)
return self unless block_given?
self.class.combine(*([self] + trackables), &block)
end
|
#combine_last(*trackables, &block) ⇒ Object
603
604
605
606
607
|
# File 'lib/reacto/trackable.rb', line 603
def combine_last(*trackables, &block)
return self unless block_given?
self.class.combine_last(*([self] + trackables), &block)
end
|
#concat(trackable) ⇒ Object
537
538
539
|
# File 'lib/reacto/trackable.rb', line 537
def concat(trackable)
lift(Operations::Concat.new(trackable))
end
|
#count(value = NO_VALUE, &block) ⇒ Object
263
264
265
266
267
268
269
270
271
272
273
274
|
# File 'lib/reacto/trackable.rb', line 263
def count(value = NO_VALUE, &block)
source =
if value != NO_VALUE
select(&Behaviours.same_predicate(value))
elsif block_given?
select(&block)
else
self
end
source.map(1).inject(&:+).last
end
|
#cycle(n = nil) ⇒ Object
239
240
241
|
# File 'lib/reacto/trackable.rb', line 239
def cycle(n = nil)
lift(Operations::Cycle.new(@behaviour, n))
end
|
#delay(delay) ⇒ Object
549
550
551
|
# File 'lib/reacto/trackable.rb', line 549
def delay(delay)
buffer(delay: delay)
end
|
#delay_each(delay) ⇒ Object
553
554
555
|
# File 'lib/reacto/trackable.rb', line 553
def delay_each(delay)
lift(Operations::DelayEach.new(delay))
end
|
#depend_on(trackable, key: :data, &block) ⇒ Object
561
562
563
564
565
|
# File 'lib/reacto/trackable.rb', line 561
def depend_on(trackable, key: :data, &block)
lift(Operations::DependOn.new(
trackable, key: key, accumulator: block
))
end
|
#diff(initial = NO_VALUE, &block) ⇒ Object
#do_track(subscription) ⇒ Object
679
680
681
682
683
684
685
|
# File 'lib/reacto/trackable.rb', line 679
def do_track(subscription)
if @executor
@executor.post(subscription, &@behaviour)
else
@behaviour.call(subscription)
end
end
|
#drop(how_many_to_drop) ⇒ Object
Also known as:
skip
484
485
486
|
# File 'lib/reacto/trackable.rb', line 484
def drop(how_many_to_drop)
lift(Operations::Drop.new(how_many_to_drop))
end
|
#drop_errors ⇒ Object
Also known as:
skip_errors
493
494
495
|
# File 'lib/reacto/trackable.rb', line 493
def drop_errors
lift(Operations::DropErrors.new)
end
|
#drop_while(&block) ⇒ Object
#each_cons(n, &block) ⇒ Object
276
277
278
279
280
281
282
283
284
285
286
|
# File 'lib/reacto/trackable.rb', line 276
def each_cons(n, &block)
raise ArgumentError.new('invalid size') if n <= 0
return each(&block) if n == 1
reset_action = -> (current) { current[1..-1] }
trackable = lift(Operations::EachCollect.new(
n, reset_action: reset_action, on_error: NO_ACTION, on_close: NO_ACTION
))
block_given? ? trackable.on(&block) : trackable
end
|
#each_slice(n, &block) ⇒ Object
288
289
290
291
292
293
294
|
# File 'lib/reacto/trackable.rb', line 288
def each_slice(n, &block)
raise ArgumentError.new('invalid size') if n <= 0
trackable = lift(Operations::EachCollect.new(n))
block_given? ? trackable.on(&block) : trackable
end
|
#each_with_index(&block) ⇒ Object
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
|
# File 'lib/reacto/trackable.rb', line 296
def each_with_index(&block)
index = 0
collect_action = -> (val, collection) do
collection << val
collection << index
index += 1
end
trackable = lift(Operations::EachCollect.new(
2, collect_action: collect_action, init_action: -> () { index = 0 },
on_error: NO_ACTION, on_close: NO_ACTION
))
block_given? ? trackable.on(&block) : trackable
end
|
#each_with_object(obj, &block) ⇒ Object
474
475
476
|
# File 'lib/reacto/trackable.rb', line 474
def each_with_object(obj, &block)
lift(Operations::EachWithObject.new(block, obj))
end
|
#entries(n = nil) ⇒ Object
313
314
315
316
317
318
319
320
321
322
323
324
325
|
# File 'lib/reacto/trackable.rb', line 313
def entries(n = nil)
return [] if n && n.is_a?(Integer) && n <= 0
trackable = self
trackable = trackable.take(n) if n && n.is_a?(Integer) && n > 0
result = []
subscription = trackable.on(value: ->(v) { result << v })
trackable.await(subscription)
result
end
|
#execute_on(executor) ⇒ Object
648
649
650
651
652
653
|
# File 'lib/reacto/trackable.rb', line 648
def execute_on(executor)
stored = EXECUTOR_ALIASES[executor]
executor = stored if stored
self.class.new(executor, &@behaviour)
end
|
#find(if_none = NO_VALUE, &block) ⇒ Object
Also known as:
detect
243
244
245
246
247
248
249
250
251
|
# File 'lib/reacto/trackable.rb', line 243
def find(if_none = NO_VALUE, &block)
trackable = select(&block).first
if if_none != NO_VALUE
trackable = trackable.append(if_none, condition: :source_empty)
end
trackable
end
|
#find_index(value = NO_VALUE, &block) ⇒ Object
253
254
255
256
257
258
259
260
261
|
# File 'lib/reacto/trackable.rb', line 253
def find_index(value = NO_VALUE, &block)
predicate =
if value != NO_VALUE
-> (v) { value == v }
else
block
end
lift(Operations::FindIndex.new(predicate))
end
|
#first(n = 1) ⇒ Object
515
516
517
518
519
|
# File 'lib/reacto/trackable.rb', line 515
def first(n = 1)
raise ArgumentError.new('negative array size') if n < 0
take(n)
end
|
#flat_map(transform = nil, label: nil, &block) ⇒ Object
Also known as:
collect_concat
373
374
375
376
377
378
379
380
381
|
# File 'lib/reacto/trackable.rb', line 373
def flat_map(transform = nil, label: nil, &block)
if label
lift(Operations::OperationOnLabeled.new(
label, block_given? ? block : transform, op: :flat_map
))
else
lift(Operations::FlatMap.new(block_given? ? block : transform))
end
end
|
#flat_map_latest(transform = nil, &block) ⇒ Object
383
384
385
|
# File 'lib/reacto/trackable.rb', line 383
def flat_map_latest(transform = nil, &block)
lift(Operations::FlatMapLatest.new(block_given? ? block : transform))
end
|
#flatten ⇒ Object
511
512
513
|
# File 'lib/reacto/trackable.rb', line 511
def flatten
lift(Operations::Flatten.new)
end
|
#flatten_labeled(initial: NO_VALUE, &block) ⇒ Object
#grep(pattern, &block) ⇒ Object
427
428
429
|
# File 'lib/reacto/trackable.rb', line 427
def grep(pattern, &block)
select_map(-> (v) { pattern === v }, &block)
end
|
#grep_v(pattern, &block) ⇒ Object
431
432
433
|
# File 'lib/reacto/trackable.rb', line 431
def grep_v(pattern, &block)
select_map(-> (v) { !(pattern === v) }, &block)
end
|
#group_by_label(executor: nil, &block) ⇒ Object
Also known as:
group_by
567
568
569
|
# File 'lib/reacto/trackable.rb', line 567
def group_by_label(executor: nil, &block)
lift(Operations::GroupByLabel.new(block, executor))
end
|
#include?(obj) ⇒ Boolean
Also known as:
member?
435
436
437
|
# File 'lib/reacto/trackable.rb', line 435
def include?(obj)
lift(Operations::Include.new(obj))
end
|
#inject(initial = NO_VALUE, label: nil, initial_value: NO_VALUE, &block) ⇒ Object
Also known as:
reduce
461
462
463
464
465
466
467
468
469
470
471
472
|
# File 'lib/reacto/trackable.rb', line 461
def inject(initial = NO_VALUE, label: nil, initial_value: NO_VALUE, &block)
return self unless block_given?
init = initial != NO_VALUE ? initial : initial_value
if label
lift(Operations::OperationOnLabeled.new(
label, block, op: :inject, initial_value: init
))
else
lift(Operations::Inject.new(block, init))
end
end
|
#last ⇒ Object
525
526
527
|
# File 'lib/reacto/trackable.rb', line 525
def last
lift(Operations::Last.new)
end
|
#lazy ⇒ Object
439
440
441
|
# File 'lib/reacto/trackable.rb', line 439
def lazy
self end
|
#lift(operation = nil, &block) ⇒ Object
361
362
363
364
365
366
367
368
369
370
371
|
# File 'lib/reacto/trackable.rb', line 361
def lift(operation = nil, &block)
operation = block_given? ? block : operation
create_lifted do |tracker_subscription|
begin
modified = operation.call(tracker_subscription)
lift_behaviour(modified) unless modified == NOTHING
rescue Exception => e
tracker_subscription.on_error(e)
end
end
end
|
#map(val = NO_VALUE, error: nil, close: nil, label: nil, &block) ⇒ Object
Also known as:
collect
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
|
# File 'lib/reacto/trackable.rb', line 387
def map(val = NO_VALUE, error: nil, close: nil, label: nil, &block)
action =
if block_given?
block
else
val == NO_VALUE ? IDENTITY_ACTION : Behaviours.constant(val)
end
if label
lift(Operations::OperationOnLabeled.new(
label, action, error: error, close: close
))
else
lift(Operations::Map.new(action, error: error, close: close))
end
end
|
#max(&block) ⇒ Object
403
404
405
|
# File 'lib/reacto/trackable.rb', line 403
def max(&block)
lift(Operations::Extremums.new(action: block))
end
|
#max_by(&block) ⇒ Object
407
408
409
|
# File 'lib/reacto/trackable.rb', line 407
def max_by(&block)
lift(Operations::Extremums.new(by: block))
end
|
#merge(*trackables, delay_error: false) ⇒ Object
541
542
543
|
# File 'lib/reacto/trackable.rb', line 541
def merge(*trackables, delay_error: false)
lift(Operations::Merge.new(trackables, delay_error: delay_error))
end
|
#min(&block) ⇒ Object
411
412
413
|
# File 'lib/reacto/trackable.rb', line 411
def min(&block)
lift(Operations::Extremums.new(action: block, type: :min))
end
|
#min_by(&block) ⇒ Object
415
416
417
|
# File 'lib/reacto/trackable.rb', line 415
def min_by(&block)
lift(Operations::Extremums.new(by: block, type: :min))
end
|
#minmax(&block) ⇒ Object
419
420
421
|
# File 'lib/reacto/trackable.rb', line 419
def minmax(&block)
lift(Operations::Extremums.new(action: block, type: :minmax))
end
|
#minmax_by(&block) ⇒ Object
423
424
425
|
# File 'lib/reacto/trackable.rb', line 423
def minmax_by(&block)
lift(Operations::Extremums.new(by: block, type: :minmax))
end
|
#off(notification_tracker = nil) ⇒ Object
346
347
348
|
# File 'lib/reacto/trackable.rb', line 346
def off(notification_tracker = nil)
end
|
#on(trackers = {}, &block) ⇒ Object
Also known as:
each, each_entry
335
336
337
338
339
340
341
342
343
344
|
# File 'lib/reacto/trackable.rb', line 335
def on(trackers = {}, &block)
trackers[:value] = block if block_given?
unless (trackers.keys - TOPICS).empty?
raise "This Trackable supports only #{TOPICS}, " \
"but #{trackers.keys} were passed."
end
track(Tracker.new(trackers))
end
|
#partition(executor: nil, &block) ⇒ Object
214
215
216
217
218
219
220
221
|
# File 'lib/reacto/trackable.rb', line 214
def partition(executor: nil, &block)
return self unless block_given?
executor = retrieve_executor(executor)
executor = @executor if executor.nil?
lift(Operations::Partition.new(block, executor: executor))
end
|
#prepend(enumerable) ⇒ Object
529
530
531
|
# File 'lib/reacto/trackable.rb', line 529
def prepend(enumerable)
lift(Operations::Prepend.new(enumerable))
end
|
#reject(&block) ⇒ Object
457
458
459
|
# File 'lib/reacto/trackable.rb', line 457
def reject(&block)
select(&->(val) { !block.call(val)} )
end
|
#rescue_and_replace_error(&block) ⇒ Object
593
594
595
596
597
|
# File 'lib/reacto/trackable.rb', line 593
def rescue_and_replace_error(&block)
return self unless block_given?
lift(Operations::RescueAndReplaceError.new(block))
end
|
#rescue_and_replace_error_with(trackable) ⇒ Object
599
600
601
|
# File 'lib/reacto/trackable.rb', line 599
def rescue_and_replace_error_with(trackable)
rescue_and_replace_error { |_error| trackable }
end
|
#retry(retries = 1) ⇒ Object
583
584
585
|
# File 'lib/reacto/trackable.rb', line 583
def retry(retries = 1)
lift(Operations::Retry.new(@behaviour, retries))
end
|
#retry_when(&block) ⇒ Object
587
588
589
590
591
|
# File 'lib/reacto/trackable.rb', line 587
def retry_when(&block)
return self unless block_given?
lift(Operations::RetryWhen.new(@behaviour, block))
end
|
#select(label: nil, &block) ⇒ Object
Also known as:
find_all
447
448
449
450
451
452
453
454
455
|
# File 'lib/reacto/trackable.rb', line 447
def select(label: nil, &block)
return self unless block_given?
if label
lift(Operations::OperationOnLabeled.new(label, block, op: :select))
else
lift(Operations::Select.new(block))
end
end
|
#slice(pattern = NO_ACTION, type:, &block) ⇒ Object
623
624
625
626
627
628
629
630
631
|
# File 'lib/reacto/trackable.rb', line 623
def slice(pattern = NO_ACTION, type:, &block)
predicate =
if pattern != NO_VALUE
-> (val) { pattern === val }
else
block
end
lift(Operations::Slice.new(predicate, type: type))
end
|
#slice_after(pattern = NO_VALUE, &block) ⇒ Object
615
616
617
|
# File 'lib/reacto/trackable.rb', line 615
def slice_after(pattern = NO_VALUE, &block)
slice(pattern, type: :after, &block)
end
|
#slice_before(pattern = NO_VALUE, &block) ⇒ Object
619
620
621
|
# File 'lib/reacto/trackable.rb', line 619
def slice_before(pattern = NO_VALUE, &block)
slice(pattern, type: :before, &block)
end
|
#slice_when(&block) ⇒ Object
633
634
635
|
# File 'lib/reacto/trackable.rb', line 633
def slice_when(&block)
lift(Operations::SliceWhen.new(block))
end
|
#sort_by(&block) ⇒ Object
208
209
210
211
212
|
# File 'lib/reacto/trackable.rb', line 208
def sort_by(&block)
return self unless block_given?
lift(Operations::BlockingEnumerable.new(:sort_by, block))
end
|
#split_labeled(label, executor: nil, &block) ⇒ Object
575
576
577
|
# File 'lib/reacto/trackable.rb', line 575
def split_labeled(label, executor: nil, &block)
lift(Operations::SplitLabeled.new(label, block, executor))
end
|
#take(how_many_to_take) ⇒ Object
497
498
499
|
# File 'lib/reacto/trackable.rb', line 497
def take(how_many_to_take)
lift(Operations::Take.new(how_many_to_take))
end
|
#take_while(&block) ⇒ Object
501
502
503
504
505
|
# File 'lib/reacto/trackable.rb', line 501
def take_while(&block)
return self unless block_given?
lift(Operations::TakeWhile.new(block))
end
|
#throttle(delay) ⇒ Object
557
558
559
|
# File 'lib/reacto/trackable.rb', line 557
def throttle(delay)
lift(Operations::Throttle.new(delay))
end
|
#to_a ⇒ Object
327
328
329
|
# File 'lib/reacto/trackable.rb', line 327
def to_a
entries
end
|
#to_h ⇒ Object
331
332
333
|
# File 'lib/reacto/trackable.rb', line 331
def to_h
to_a.to_h
end
|
#track(notification_tracker, &block) ⇒ Object
#track_on(executor) ⇒ Object
641
642
643
644
645
646
|
# File 'lib/reacto/trackable.rb', line 641
def track_on(executor)
stored = EXECUTOR_ALIASES[executor]
executor = stored if stored
lift(Operations::TrackOn.new(executor))
end
|
#uniq ⇒ Object
507
508
509
|
# File 'lib/reacto/trackable.rb', line 507
def uniq
lift(Operations::Uniq.new)
end
|
#wrap(**args) ⇒ Object
443
444
445
|
# File 'lib/reacto/trackable.rb', line 443
def wrap(**args)
lift(Operations::Wrap.new(args))
end
|
#zip(*trackables, &block) ⇒ Object
637
638
639
|
# File 'lib/reacto/trackable.rb', line 637
def zip(*trackables, &block)
self.class.zip(*([self] + trackables), &block)
end
|