Class: Reactive::Observable::Base
- Inherits:
-
Object
- Object
- Reactive::Observable::Base
show all
- Defined in:
- lib/Reactive/observable/base.rb
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(opts = {}) ⇒ Base
Returns a new instance of Base.
24
25
26
27
28
|
# File 'lib/Reactive/observable/base.rb', line 24
def initialize(opts = {})
self.class.attributes.each { |name, default|
instance_variable_set(:"@#{name}", opts[name] || default)
}
end
|
Class Method Details
.add_attributes(*v) ⇒ Object
8
9
10
11
12
|
# File 'lib/Reactive/observable/base.rb', line 8
def self.add_attributes(*v)
defaults = v.last.kind_of?(Hash) ? v.pop.to_a : []
v.map! {|x| [x,nil] }
self.attributes += v + defaults
end
|
.observer_on_next(&method) ⇒ Object
14
15
16
17
18
19
|
# File 'lib/Reactive/observable/base.rb', line 14
def self.observer_on_next(&method)
observer = Class.new(Reactive::ObserverWrapper) do
define_method(:on_next, &method)
end
const_set(:Observer, observer)
end
|
Instance Method Details
#first(count = 1) ⇒ Object
74
75
76
|
# File 'lib/Reactive/observable/base.rb', line 74
def first(count = 1)
First.new(target: self, count: count)
end
|
#grep(predicate) ⇒ Object
70
71
72
|
# File 'lib/Reactive/observable/base.rb', line 70
def grep(predicate)
Grep.new(target: self, predicate: predicate)
end
|
#map(&proc) ⇒ Object
66
67
68
|
# File 'lib/Reactive/observable/base.rb', line 66
def map(&proc)
Map.new(target: self, mapper: proc)
end
|
#maybe_scheduler(arg = nil) ⇒ Object
54
55
56
|
# File 'lib/Reactive/observable/base.rb', line 54
def maybe_scheduler(arg = nil)
arg ? {scheduler => arg} : {}
end
|
#merge(observable) ⇒ Object
60
61
62
63
64
|
# File 'lib/Reactive/observable/base.rb', line 60
def merge(observable)
observable ? Merge.new(o1: self, o2: observable) : MergeNotifications(self)
end
|
#observer_args(observer, parent) ⇒ Object
30
31
32
33
34
35
36
|
# File 'lib/Reactive/observable/base.rb', line 30
def observer_args(observer, parent)
opts = self.class.attributes.each_with_object({}) do |attr, hash|
hash[attr[0]] = instance_variable_get(:"@#{attr[0]}")
end
[observer, parent, opts]
end
|
#push(observable) ⇒ Object
78
79
80
|
# File 'lib/Reactive/observable/base.rb', line 78
def push(observable)
Push.new(o1: self, o2: observable)
end
|
#scheduler ⇒ Object
38
39
40
|
# File 'lib/Reactive/observable/base.rb', line 38
def scheduler
@scheduler ||= Reactive::Scheduler.new
end
|
#skip(count) ⇒ Object
82
83
84
|
# File 'lib/Reactive/observable/base.rb', line 82
def skip(count)
Skip.new(target: self, count: count)
end
|
#subscribe(handlers) ⇒ Object
42
43
44
45
46
47
|
# File 'lib/Reactive/observable/base.rb', line 42
def subscribe(handlers)
[:on_next, :on_error, :on_complete].each {|h| handlers[h] ||= ->(*v) { } }
observer = Reactive::Observer.new(handlers)
self.subscribe_observer(observer)
end
|
#subscribe_observer(observer) ⇒ Object
49
50
51
|
# File 'lib/Reactive/observable/base.rb', line 49
def subscribe_observer(observer)
self.run(observer)
end
|