Class: Reacto::Trackable

Inherits:
Object
  • Object
show all
Defined in:
lib/reacto/trackable.rb

Direct Known Subclasses

LabeledTrackable, SharedTrackable

Constant Summary collapse

TOPICS =
[:open, :value, :error, :close]

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(behaviour = NO_ACTION, executor = nil, &block) ⇒ Trackable

Returns a new instance of Trackable.



171
172
173
174
# File 'lib/reacto/trackable.rb', line 171

def initialize(behaviour = NO_ACTION, executor = nil, &block)
  @behaviour = block_given? ? block : behaviour
  @executor = executor
end

Class Method Details

.close(executor = nil) ⇒ Object



38
39
40
41
42
# File 'lib/reacto/trackable.rb', line 38

def close(executor = nil)
  make(nil, executor) do |subscriber|
    subscriber.on_close
  end
end

.combine(*trackables, &block) ⇒ Object



22
23
24
25
26
# File 'lib/reacto/trackable.rb', line 22

def combine(*trackables, &block)
  combine_create(
    Subscriptions::CombiningSubscription, *trackables, &block
  )
end

.combine_last(*trackables, &block) ⇒ Object



28
29
30
31
32
# File 'lib/reacto/trackable.rb', line 28

def combine_last(*trackables, &block)
  combine_create(
    Subscriptions::CombiningLastSubscription, *trackables, &block
  )
end

.combine_with(function, *trackables) ⇒ Object



156
157
# File 'lib/reacto/trackable.rb', line 156

def combine_with(function, *trackables)
end

.enumerable(enumerable, executor = nil) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/reacto/trackable.rb', line 141

def enumerable(enumerable, executor = nil)
  make(nil, executor) do |tracker|
    begin
      enumerable.each do |val|
        break unless tracker.subscribed?
        tracker.on_value(val)
      end

      tracker.on_close if tracker.subscribed?
    rescue => error
      tracker.on_error(error) if tracker.subscribed?
    end
  end
end

.error(err, executor = nil) ⇒ Object



44
45
46
47
48
# File 'lib/reacto/trackable.rb', line 44

def error(err, executor = nil)
  make(nil, executor) do |subscriber|
    subscriber.on_error(err)
  end
end

.interval(interval, enumerator = Behaviours.integers_enumerator, executor: nil) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/reacto/trackable.rb', line 70

def interval(
  interval,
  enumerator = Behaviours.integers_enumerator,
  executor: nil
)
  if executor.is_a?(Concurrent::ImmediateExecutor)
    make do |tracker|
      Behaviours.with_close_and_error(tracker) do |subscriber|
        while subscriber.subscribed?
          sleep interval if subscriber.subscribed?
          if subscriber.subscribed?
            begin
              subscriber.on_value(enumerator.next)
            rescue StopIteration
              break
            end
          else
            break
          end
        end
      end
    end
  else
    make do |tracker|
      Thread::abort_on_exception = true

      queue = Queue.new
      task = Concurrent::TimerTask.new(execution_interval: interval) do
        queue.push('ready')
      end

      thread = Thread.new do
        begin
          loop do
            queue.pop
            break unless tracker.subscribed?

            begin
              value = enumerator.next
              tracker.on_value(value)
            rescue StopIteration
              tracker.on_close if tracker.subscribed?
              break
            rescue StandardError => error
              tracker.on_error(error) if tracker.subscribed?
              break
            end
          end
        ensure
          task.shutdown
        end
      end
      task.execute

      tracker.add_resource(Reacto::Resources::ExecutorResource.new(
        task, threads: [thread]
      ))
    end
  end
end

.later(secs, value, executor: Reacto::Executors.tasks) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/reacto/trackable.rb', line 55

def later(secs, value, executor: Reacto::Executors.tasks)
  if executor.is_a?(Concurrent::ImmediateExecutor)
    make do |tracker|
      sleep secs
      Behaviours.single_tracker_value(tracker, value)
    end
  else
    make do |tracker|
      Concurrent::ScheduledTask.execute(secs, executor: executor) do
        Behaviours.single_tracker_value(tracker, value)
      end
    end
  end
end

.make(behaviour = NO_ACTION, executor = nil, &block) ⇒ Object



50
51
52
53
# File 'lib/reacto/trackable.rb', line 50

def make(behaviour = NO_ACTION, executor = nil, &block)
  behaviour = block_given? ? block : behaviour
  self.new(behaviour, executor)
end

.neverObject



18
19
20
# File 'lib/reacto/trackable.rb', line 18

def never
  self.new
end

.repeat(array, int: 0.1, executor: nil) ⇒ Object



131
132
133
134
135
# File 'lib/reacto/trackable.rb', line 131

def repeat(array, int: 0.1, executor: nil)
  interval(
    int, Behaviours.array_repeat_enumerator(array), executor: executor
  )
end

.value(value, executor = nil) ⇒ Object



137
138
139
# File 'lib/reacto/trackable.rb', line 137

def value(value, executor = nil)
  make(Behaviours.single_value(value), executor)
end

.zip(*trackables, &block) ⇒ Object



34
35
36
# File 'lib/reacto/trackable.rb', line 34

def zip(*trackables, &block)
  combine_create(Subscriptions::ZippingSubscription, *trackables, &block)
end

Instance Method Details

#[](x) ⇒ Object



264
265
266
# File 'lib/reacto/trackable.rb', line 264

def [](x)
  lift(Operations::Drop.new(x, 1))
end

#act(action = NO_ACTION, on: Operations::Act::ALL, &block) ⇒ Object



313
314
315
# File 'lib/reacto/trackable.rb', line 313

def act(action = NO_ACTION, on: Operations::Act::ALL, &block)
  lift(Operations::Act.new(block_given? ? block : action, on))
end

#await(subscription, timeout = nil) ⇒ Object



325
326
327
328
329
# File 'lib/reacto/trackable.rb', line 325

def await(subscription, timeout = nil)
  latch = Concurrent::CountDownLatch.new(1)
  subscription.add(Subscriptions.on_close_and_error { latch.count_down })
  latch.wait(timeout)
end

#buffer(count: nil, delay: nil) ⇒ Object



284
285
286
# File 'lib/reacto/trackable.rb', line 284

def buffer(count: nil, delay: nil)
  lift(Operations::Buffer.new(count: count, delay: delay))
end

#cache(type: :memory, **settings) ⇒ Object



296
297
298
299
# File 'lib/reacto/trackable.rb', line 296

def cache(type: :memory, **settings)
  settings ||= {}
  lift(Operations::Cache.new(type: type, **settings))
end

#concat(trackable) ⇒ Object



276
277
278
# File 'lib/reacto/trackable.rb', line 276

def concat(trackable)
  lift(Operations::Concat.new(trackable))
end

#delay(delay) ⇒ Object



288
289
290
# File 'lib/reacto/trackable.rb', line 288

def delay(delay)
  buffer(delay: delay)
end

#depend_on(trackable, key: :data, accumulator: nil, &block) ⇒ Object



301
302
303
304
305
# File 'lib/reacto/trackable.rb', line 301

def depend_on(trackable, key: :data, accumulator: nil, &block)
  lift(Operations::DependOn.new(
    trackable, key: key, accumulator: (block_given? ? block : accumulator)
  ))
end

#diff(initial = NO_VALUE, fn = Operations::Diff::DEFAULT_FN, &block) ⇒ Object



236
237
238
# File 'lib/reacto/trackable.rb', line 236

def diff(initial = NO_VALUE, fn = Operations::Diff::DEFAULT_FN, &block)
  lift(Operations::Diff.new(block_given? ? block : fn, initial))
end

#do_track(subscription) ⇒ Object



334
335
336
337
338
339
340
# File 'lib/reacto/trackable.rb', line 334

def do_track(subscription)
  if @executor
    @executor.post(subscription, &@behaviour)
  else
    @behaviour.call(subscription)
  end
end

#drop(how_many_to_drop) ⇒ Object Also known as: skip



240
241
242
# File 'lib/reacto/trackable.rb', line 240

def drop(how_many_to_drop)
  lift(Operations::Drop.new(how_many_to_drop))
end

#drop_errorsObject Also known as: skip_errors



244
245
246
# File 'lib/reacto/trackable.rb', line 244

def drop_errors
  lift(Operations::DropErrors.new)
end

#execute_on(executor) ⇒ Object



321
322
323
# File 'lib/reacto/trackable.rb', line 321

def execute_on(executor)
  self.class.new(@behaviour, executor)
end

#firstObject



260
261
262
# File 'lib/reacto/trackable.rb', line 260

def first
  take(1)
end

#flat_map(transform = nil, &block) ⇒ Object



210
211
212
# File 'lib/reacto/trackable.rb', line 210

def flat_map(transform = nil, &block)
  lift(Operations::FlatMap.new(block_given? ? block : transform))
end

#flat_map_latest(transform = nil, &block) ⇒ Object



214
215
216
# File 'lib/reacto/trackable.rb', line 214

def flat_map_latest(transform = nil, &block)
  lift(Operations::FlatMapLatest.new(block_given? ? block : transform))
end

#flattenObject



256
257
258
# File 'lib/reacto/trackable.rb', line 256

def flatten
  lift(Operations::Flatten.new)
end

#inject(initial = NO_VALUE, injector = nil, &block) ⇒ Object



232
233
234
# File 'lib/reacto/trackable.rb', line 232

def inject(initial = NO_VALUE, injector = nil, &block)
  lift(Operations::Inject.new(block_given? ? block : injector, initial))
end

#label(labeling_action = nil, executor: nil, &block) ⇒ Object



307
308
309
310
311
# File 'lib/reacto/trackable.rb', line 307

def label(labeling_action = nil, executor: nil, &block)
  lift(Operations::Label.new(
    block_given? ? block : labeling_action, executor
  ))
end

#lastObject



268
269
270
# File 'lib/reacto/trackable.rb', line 268

def last
  lift(Operations::Last.new)
end

#lift(operation = nil, &block) ⇒ Object



198
199
200
201
202
203
204
205
206
207
208
# File 'lib/reacto/trackable.rb', line 198

def lift(operation = nil, &block)
  operation = block_given? ? block : operation
  self.class.new(nil, @executor) do |tracker_subscription|
    begin
      modified = operation.call(tracker_subscription)
      lift_behaviour(modified) unless modified == NOTHING
    rescue Exception => e
      tracker_subscription.on_error(e)
    end
  end
end

#map(mapping = nil, error: nil, close: nil, &block) ⇒ Object



218
219
220
221
222
# File 'lib/reacto/trackable.rb', line 218

def map(mapping = nil, error: nil, close: nil, &block)
  lift(Operations::Map.new(
    block_given? ? block : mapping, error: error, close: close
  ))
end

#merge(trackable, delay_error: false) ⇒ Object



280
281
282
# File 'lib/reacto/trackable.rb', line 280

def merge(trackable, delay_error: false)
  lift(Operations::Merge.new(trackable, delay_error: delay_error))
end

#off(notification_tracker = nil) ⇒ Object



185
186
187
# File 'lib/reacto/trackable.rb', line 185

def off(notification_tracker = nil)
  # Clean-up logic
end

#on(trackers = {}) ⇒ Object



176
177
178
179
180
181
182
183
# File 'lib/reacto/trackable.rb', line 176

def on(trackers = {})
  unless (trackers.keys - TOPICS).empty?
    raise "This Trackable supports only #{TOPICS}, " \
      "but #{trackers.keys} were passed."
  end

  track(Tracker.new(trackers))
end

#prepend(enumerable) ⇒ Object



272
273
274
# File 'lib/reacto/trackable.rb', line 272

def prepend(enumerable)
  lift(Operations::Prepend.new(enumerable))
end

#select(filter = nil, &block) ⇒ Object



228
229
230
# File 'lib/reacto/trackable.rb', line 228

def select(filter = nil, &block)
  lift(Operations::Select.new(block_given? ? block : filter))
end

#take(how_many_to_take) ⇒ Object



248
249
250
# File 'lib/reacto/trackable.rb', line 248

def take(how_many_to_take)
  lift(Operations::Take.new(how_many_to_take))
end

#throttle(delay) ⇒ Object



292
293
294
# File 'lib/reacto/trackable.rb', line 292

def throttle(delay)
  lift(Operations::Throttle.new(delay))
end

#track(notification_tracker) ⇒ Object



189
190
191
192
193
194
195
196
# File 'lib/reacto/trackable.rb', line 189

def track(notification_tracker)
  subscription =
    Subscriptions::TrackerSubscription.new(notification_tracker, self)

  do_track(subscription)

  Subscriptions::SubscriptionWrapper.new(subscription)
end

#track_on(executor) ⇒ Object



317
318
319
# File 'lib/reacto/trackable.rb', line 317

def track_on(executor)
  lift(Operations::TrackOn.new(executor))
end

#uniqObject



252
253
254
# File 'lib/reacto/trackable.rb', line 252

def uniq
  lift(Operations::Uniq.new)
end

#wrap(**args) ⇒ Object



224
225
226
# File 'lib/reacto/trackable.rb', line 224

def wrap(**args)
  lift(Operations::Wrap.new(args))
end