Class: Reactive::Observable::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/Reactive/observable/base.rb

Direct Known Subclasses

Composite, FromProc, Generate

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Base

Returns a new instance of Base.



24
25
26
27
28
# File 'lib/Reactive/observable/base.rb', line 24

def initialize(opts = {})
  self.class.attributes.each { |name, default|
    instance_variable_set(:"@#{name}", opts[name] || default)
  }
end

Class Method Details

.add_attributes(*v) ⇒ Object



8
9
10
11
12
# File 'lib/Reactive/observable/base.rb', line 8

def self.add_attributes(*v)
  defaults = v.last.kind_of?(Hash) ? v.pop.to_a : []
  v.map! {|x| [x,nil] }
  self.attributes += v + defaults
end

.observer_on_next(&method) ⇒ Object



14
15
16
17
18
19
# File 'lib/Reactive/observable/base.rb', line 14

def self.observer_on_next(&method)
  observer = Class.new(Reactive::ObserverWrapper) do
    define_method(:on_next, &method)
  end
  const_set(:Observer, observer)
end

Instance Method Details

#first(count = 1) ⇒ Object



74
75
76
# File 'lib/Reactive/observable/base.rb', line 74

def first(count = 1)
  First.new(target: self, count: count)
end

#grep(predicate) ⇒ Object



70
71
72
# File 'lib/Reactive/observable/base.rb', line 70

def grep(predicate)
  Grep.new(target: self, predicate: predicate)
end

#map(&proc) ⇒ Object



66
67
68
# File 'lib/Reactive/observable/base.rb', line 66

def map(&proc)
  Map.new(target: self, mapper: proc)
end

#maybe_scheduler(arg = nil) ⇒ Object

!!!



54
55
56
# File 'lib/Reactive/observable/base.rb', line 54

def maybe_scheduler(arg = nil)
  arg ? {scheduler => arg} : {}
end

#merge(observable) ⇒ Object

joining



60
61
62
63
64
# File 'lib/Reactive/observable/base.rb', line 60

def merge(observable)
  #MultiMerge.new(observables)
  #observable = observables[0]
  observable ? Merge.new(o1: self, o2: observable) : MergeNotifications(self)
end

#observer_args(observer, parent) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/Reactive/observable/base.rb', line 30

def observer_args(observer, parent)
  #base = {:observer => observer, :parent => parent}
  opts = self.class.attributes.each_with_object({}) do |attr, hash|
    hash[attr[0]] = instance_variable_get(:"@#{attr[0]}")
  end
  [observer, parent, opts]
end

#push(observable) ⇒ Object



78
79
80
# File 'lib/Reactive/observable/base.rb', line 78

def push(observable)
  Push.new(o1: self, o2: observable)
end

#schedulerObject



38
39
40
# File 'lib/Reactive/observable/base.rb', line 38

def scheduler
  @scheduler ||= Reactive::Scheduler.new
end

#skip(count) ⇒ Object



82
83
84
# File 'lib/Reactive/observable/base.rb', line 82

def skip(count)
  Skip.new(target: self, count: count)
end

#subscribe(handlers) ⇒ Object



42
43
44
45
46
47
# File 'lib/Reactive/observable/base.rb', line 42

def subscribe(handlers)
  [:on_next, :on_error, :on_complete].each {|h| handlers[h] ||= ->(*v) { }  }
  observer = Reactive::Observer.new(handlers)
  #???
  self.subscribe_observer(observer)
end

#subscribe_observer(observer) ⇒ Object



49
50
51
# File 'lib/Reactive/observable/base.rb', line 49

def subscribe_observer(observer)
  self.run(observer)
end